Wikipedia River: A river to index wikipedia, closes #403.

This commit is contained in:
kimchy 2010-10-03 22:22:45 +02:00
parent 425744e0db
commit c4d17860a1
21 changed files with 1227 additions and 0 deletions

View File

@ -148,6 +148,7 @@
<w>uuid</w>
<w>versioned</w>
<w>warmup</w>
<w>wikipedia</w>
<w>wildcards</w>
<w>xcontent</w>
<w>xson</w>

View File

@ -14,6 +14,7 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-river-couchdb.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-river-couchdb.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-river-rabbitmq.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-river-rabbitmq.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-river-wikipedia.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-river-wikipedia.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" />

View File

@ -24,6 +24,7 @@
<orderEntry type="module" module-name="plugin-river-twitter" />
<orderEntry type="module" module-name="plugin-river-couchdb" />
<orderEntry type="module" module-name="plugin-river-rabbitmq" />
<orderEntry type="module" module-name="plugin-river-wikipedia" />
<orderEntry type="module" module-name="test-integration" />
</component>
</module>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/river/wikipedia/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/river/wikipedia/build/classes/test" />
<exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/river/wikipedia">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/river/wikipedia/src/main/java" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="module" module-name="test-testng" scope="TEST" />
<orderEntry type="library" scope="TEST" name="testng" level="project" />
<orderEntry type="library" scope="TEST" name="hamcrest" level="project" />
</component>
</module>

View File

@ -0,0 +1,136 @@
dependsOn(':elasticsearch')
apply plugin: 'java'
apply plugin: 'maven'
archivesBaseName = "elasticsearch-river-wikipedia"
explodedDistDir = new File(distsDir, 'exploded')
manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::River::Wikipedia", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
configurations.compile.transitive = true
configurations.testCompile.transitive = true
// no need to use the resource dir
sourceSets.main.resources.srcDirs 'src/main/java'
sourceSets.test.resources.srcDirs 'src/test/java'
// add the source files to the dist jar
//jar {
// from sourceSets.main.allJava
//}
configurations {
dists
distLib {
visible = false
transitive = false
}
}
dependencies {
compile project(':elasticsearch')
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
testCompile 'org.hamcrest:hamcrest-all:1.1'
}
test {
useTestNG()
jmvArgs = ["-ea", "-Xmx1024m"]
suiteName = project.name
listeners = ["org.elasticsearch.util.testng.Listeners"]
systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties")
}
task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << {
[explodedDistDir]*.mkdirs()
copy {
from configurations.distLib
into explodedDistDir
}
// remove elasticsearch files (compile above adds the elasticsearch one)
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") }
copy {
from libsDir
into explodedDistDir
}
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") }
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") }
}
task zip(type: Zip, dependsOn: ['explodedDist']) {
from(explodedDistDir) {
}
}
task release(dependsOn: [zip]) << {
ant.delete(dir: explodedDistDir)
copy {
from distsDir
into(new File(rootProject.distsDir, "plugins"))
}
}
configurations {
deployerJars
}
dependencies {
deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2"
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}
artifacts {
archives sourcesJar
archives javadocJar
}
uploadArchives {
repositories.mavenDeployer {
configuration = configurations.deployerJars
repository(url: rootProject.mavenRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
pom.project {
inceptionYear '2009'
name 'elasticsearch-plugins-river-wikipedia'
description 'Attachments Plugin for ElasticSearch'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
scm {
connection 'git://github.com/elasticsearch/elasticsearch.git'
developerConnection 'git@github.com:elasticsearch/elasticsearch.git'
url 'http://github.com/elasticsearch/elasticsearch'
}
}
pom.whenConfigured {pom ->
pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones
}
}
}

View File

@ -0,0 +1,2 @@
plugin=org.elasticsearch.plugin.river.wikipedia.WikipediaRiverPlugin

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.river.wikipedia;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugins.AbstractPlugin;
/**
* @author kimchy (shay.banon)
*/
public class WikipediaRiverPlugin extends AbstractPlugin {
@Inject public WikipediaRiverPlugin() {
}
@Override public String name() {
return "river-wikipedia";
}
@Override public String description() {
return "River Wikipedia Plugin";
}
}

View File

@ -0,0 +1,235 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.wikipedia.support.PageCallbackHandler;
import org.elasticsearch.river.wikipedia.support.WikiPage;
import org.elasticsearch.river.wikipedia.support.WikiXMLParser;
import org.elasticsearch.river.wikipedia.support.WikiXMLParserFactory;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kimchy (shay.banon)
*/
public class WikipediaRiver extends AbstractRiverComponent implements River {
private StringBuilder sb = new StringBuilder();
private final Client client;
private final URL url;
private final String indexName;
private final String typeName;
private final int bulkSize;
private final int dropThreshold;
private final AtomicInteger onGoingBulks = new AtomicInteger();
private volatile Thread thread;
private volatile boolean closed = false;
private volatile BulkRequestBuilder currentRequest;
@Inject public WikipediaRiver(RiverName riverName, RiverSettings settings, Client client) throws MalformedURLException {
super(riverName, settings);
this.client = client;
String url = "http://download.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2";
if (settings.settings().containsKey("wikipedia")) {
Map<String, Object> wikipediaSettings = (Map<String, Object>) settings.settings().get("wikipedia");
url = XContentMapValues.nodeStringValue(wikipediaSettings.get("url"), url);
}
logger.info("creating wikipedia stream river for [{}]", url);
this.url = new URL(url);
if (settings.settings().containsKey("index")) {
Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name());
typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "status");
this.bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100);
this.dropThreshold = XContentMapValues.nodeIntegerValue(settings.settings().get("drop_threshold"), 10);
} else {
indexName = riverName.name();
typeName = "page";
bulkSize = 100;
dropThreshold = 10;
}
}
@Override public void start() {
logger.info("starting twitter stream");
try {
client.admin().indices().prepareCreate(indexName).execute().actionGet();
} catch (Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// that's fine
} else if (ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException) {
// ok, not recovered yet..., lets start indexing and hope we recover by the first bulk
// TODO: a smarter logic can be to register for cluster event listener here, and only start sampling when the block is removed...
} else {
logger.warn("failed to create index [{}], disabling river...", e, indexName);
return;
}
}
currentRequest = client.prepareBulk();
WikiXMLParser parser = WikiXMLParserFactory.getSAXParser(url);
try {
parser.setPageCallback(new PageCallback());
} catch (Exception e) {
logger.error("failed to create parser", e);
return;
}
thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "wikipedia_slurper").newThread(new Parser(parser));
thread.start();
}
@Override public void close() {
logger.info("closing wikipedia river");
closed = true;
if (thread != null) {
thread.interrupt();
}
}
private class Parser implements Runnable {
private final WikiXMLParser parser;
private Parser(WikiXMLParser parser) {
this.parser = parser;
}
@Override public void run() {
try {
parser.parse();
} catch (Exception e) {
if (closed) {
return;
}
logger.error("failed to parse stream", e);
}
}
}
private class PageCallback implements PageCallbackHandler {
@Override public void process(WikiPage page) {
if (closed) {
return;
}
String title = stripTitle(page.getTitle());
if (logger.isTraceEnabled()) {
logger.trace("page {} : {}", page.getID(), page.getTitle());
}
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("title", title);
builder.field("text", page.getText());
builder.field("redirect", page.isRedirect());
builder.field("special", page.isSpecialPage());
builder.field("stub", page.isStub());
builder.field("disambiguation", page.isDisambiguationPage());
builder.startArray("category");
for (String s : page.getCategories()) {
builder.value(s);
}
builder.endArray();
builder.startArray("link");
for (String s : page.getLinks()) {
builder.value(s);
}
builder.endArray();
builder.endObject();
// For now, we index (and not create) since we need to keep track of what we indexed...
currentRequest.add(Requests.indexRequest(indexName).type(typeName).id(page.getID()).create(false).source(builder));
processBulkIfNeeded();
} catch (Exception e) {
logger.warn("failed to construct index request", e);
}
}
private void processBulkIfNeeded() {
if (currentRequest.numberOfActions() >= bulkSize) {
// execute the bulk operation
int currentOnGoingBulks = onGoingBulks.incrementAndGet();
if (currentOnGoingBulks > dropThreshold) {
// TODO, just wait here!, we can slow down the wikipedia parsing
onGoingBulks.decrementAndGet();
logger.warn("dropping bulk, [{}] crossed threshold [{}]", onGoingBulks, dropThreshold);
} else {
try {
currentRequest.execute(new ActionListener<BulkResponse>() {
@Override public void onResponse(BulkResponse bulkResponse) {
onGoingBulks.decrementAndGet();
}
@Override public void onFailure(Throwable e) {
logger.warn("failed to execute bulk");
}
});
} catch (Exception e) {
logger.warn("failed to process bulk", e);
}
}
currentRequest = client.prepareBulk();
}
}
}
private String stripTitle(String title) {
sb.setLength(0);
sb.append(title);
while (sb.length() > 0 && (sb.charAt(sb.length() - 1) == '\n' || (sb.charAt(sb.length() - 1) == ' '))) {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.river.River;
/**
* @author kimchy (shay.banon)
*/
public class WikipediaRiverModule extends AbstractModule {
@Override protected void configure() {
bind(River.class).to(WikipediaRiver.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia.support;
/**
* A class abstracting Wiki infobox
*
* @author Delip Rao
*/
public class InfoBox {
String infoBoxWikiText = null;
InfoBox(String infoBoxWikiText) {
this.infoBoxWikiText = infoBoxWikiText;
}
public String dumpRaw() {
return infoBoxWikiText;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia.support;
public class IteratorHandler implements PageCallbackHandler {
private WikiXMLParser parser = null;
public IteratorHandler(WikiXMLParser myParser) {
parser = myParser;
}
public void process(WikiPage page) {
parser.notifyPage(page);
}
}

View File

@ -0,0 +1,26 @@
package org.elasticsearch.river.wikipedia.support;
/**
* Interface to allow streamed processing of pages.
* This allows a SAX style processing of Wikipedia XML files.
* The registered callback is executed on each page
* element in the XML file.
* <p>
* Using callbacks will consume lesser memory, an useful feature for large
* dumps like English and German.
*
* @author Delip Rao
* @see WikiXMLDOMParser
* @see WikiPage
*/
public interface PageCallbackHandler {
/**
* This is the callback method that should be implemented before
* registering with <code>WikiXMLDOMParser</code>
*
* @param page a wikipedia page object
* @see WikiPage
*/
public void process(WikiPage page);
}

View File

@ -0,0 +1,60 @@
package org.elasticsearch.river.wikipedia.support;
import org.xml.sax.Attributes;
import org.xml.sax.helpers.DefaultHandler;
/**
* A Wrapper class for the PageCallbackHandler
*
* @author Jason Smith
*/
public class SAXPageCallbackHandler extends DefaultHandler {
private PageCallbackHandler pageHandler;
private WikiPage currentPage;
private String currentTag;
private String currentWikitext;
private String currentTitle;
private String currentID;
public SAXPageCallbackHandler(PageCallbackHandler ph) {
pageHandler = ph;
}
public void startElement(String uri, String name, String qName, Attributes attr) {
currentTag = qName;
if (qName.equals("page")) {
currentPage = new WikiPage();
currentWikitext = "";
currentTitle = "";
currentID = "";
}
}
public void endElement(String uri, String name, String qName) {
if (qName.equals("page")) {
currentPage.setTitle(currentTitle);
currentPage.setID(currentID);
currentPage.setWikiText(currentWikitext);
pageHandler.process(currentPage);
}
if (qName.equals("mediawiki")) {
// TODO hasMoreElements() should now return false
}
}
public void characters(char ch[], int start, int length) {
if (currentTag.equals("title")) {
currentTitle = currentTitle.concat(new String(ch, start, length));
}
// TODO: To avoid looking at the revision ID, only the first ID is taken.
// I'm not sure how big the block size is in each call to characters(),
// so this may be unsafe.
else if ((currentTag.equals("id")) && (currentID.length() == 0)) {
currentID = new String(ch, start, length);
} else if (currentTag.equals("text")) {
currentWikitext = currentWikitext.concat(new String(ch, start, length));
}
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia.support;
import java.util.List;
/**
* Data structures for a wikipedia page.
*
* @author Delip Rao
*/
public class WikiPage {
private String title = null;
private WikiTextParser wikiTextParser = null;
private String id = null;
/**
* Set the page title. This is not intended for direct use.
*
* @param title
*/
public void setTitle(String title) {
this.title = title;
}
/**
* Set the wiki text associated with this page.
* This setter also introduces side effects. This is not intended for direct use.
*
* @param wtext wiki-formatted text
*/
public void setWikiText(String wtext) {
wikiTextParser = new WikiTextParser(wtext);
}
/**
* @return a string containing the page title.
*/
public String getTitle() {
return title;
}
/**
* @param languageCode
* @return a string containing the title translated
* in the given languageCode.
*/
public String getTranslatedTitle(String languageCode) {
return wikiTextParser.getTranslatedTitle(languageCode);
}
/**
* @return true if this a disambiguation page.
*/
public boolean isDisambiguationPage() {
if (title.contains("(disambiguation)") ||
wikiTextParser.isDisambiguationPage())
return true;
else return false;
}
/**
* @return true for "special pages" -- like Category:, Wikipedia:, etc
*/
public boolean isSpecialPage() {
return title.contains(":");
}
/**
* Use this method to get the wiki text associated with this page.
* Useful for custom processing the wiki text.
*
* @return a string containing the wiki text.
*/
public String getWikiText() {
return wikiTextParser.getText();
}
/**
* @return true if this is a redirection page
*/
public boolean isRedirect() {
return wikiTextParser.isRedirect();
}
/**
* @return true if this is a stub page
*/
public boolean isStub() {
return wikiTextParser.isStub();
}
/**
* @return the title of the page being redirected to.
*/
public String getRedirectPage() {
return wikiTextParser.getRedirectText();
}
/**
* @return plain text stripped of all wiki formatting.
*/
public String getText() {
return wikiTextParser.getPlainText();
}
/**
* @return a list of categories the page belongs to, null if this a redirection/disambiguation page
*/
public List<String> getCategories() {
return wikiTextParser.getCategories();
}
/**
* @return a list of links contained in the page
*/
public List<String> getLinks() {
return wikiTextParser.getLinks();
}
public void setID(String id) {
this.id = id;
}
public InfoBox getInfoBox() {
return wikiTextParser.getInfoBox();
}
public String getID() {
return id;
}
}

View File

@ -0,0 +1,47 @@
package org.elasticsearch.river.wikipedia.support;
import java.util.Vector;
/**
* A class to iterate the pages after the wikipedia XML file has been parsed with {@link WikiXMLDOMParser}.
*
* @author Delip Rao
* @see WikiXMLDOMParser
*/
public class WikiPageIterator {
private int currentPage = 0;
private int lastPage = 0;
Vector<WikiPage> pageList = null;
public WikiPageIterator(Vector<WikiPage> list) {
pageList = list;
if (pageList != null)
lastPage = pageList.size();
}
/**
* @return true if there are more pages to be read
*/
public boolean hasMorePages() {
return (currentPage < lastPage);
}
/**
* Reset the iterator.
*/
public void reset() {
currentPage = 0;
}
/**
* Advances the iterator by one position.
*
* @return a {@link WikiPage}
*/
public WikiPage nextPage() {
if (hasMorePages())
return pageList.elementAt(currentPage++);
return null;
}
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia.support;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* For internal use only -- Used by the {@link WikiPage} class.
* Can also be used as a stand alone class to parse wiki formatted text.
*
* @author Delip Rao
*/
public class WikiTextParser {
private String wikiText = null;
private ArrayList<String> pageCats = null;
private ArrayList<String> pageLinks = null;
private boolean redirect = false;
private String redirectString = null;
private static Pattern redirectPattern =
Pattern.compile("#REDIRECT\\s+\\[\\[(.*?)\\]\\]");
private boolean stub = false;
private boolean disambiguation = false;
private static Pattern stubPattern = Pattern.compile("\\-stub\\}\\}");
private static Pattern disambCatPattern = Pattern.compile("\\{\\{disambig\\}\\}");
private InfoBox infoBox = null;
public WikiTextParser(String wtext) {
wikiText = wtext;
Matcher matcher = redirectPattern.matcher(wikiText);
if (matcher.find()) {
redirect = true;
if (matcher.groupCount() == 1)
redirectString = matcher.group(1);
}
matcher = stubPattern.matcher(wikiText);
stub = matcher.find();
matcher = disambCatPattern.matcher(wikiText);
disambiguation = matcher.find();
}
public boolean isRedirect() {
return redirect;
}
public boolean isStub() {
return stub;
}
public String getRedirectText() {
return redirectString;
}
public String getText() {
return wikiText;
}
public ArrayList<String> getCategories() {
if (pageCats == null) parseCategories();
return pageCats;
}
public ArrayList<String> getLinks() {
if (pageLinks == null) parseLinks();
return pageLinks;
}
private void parseCategories() {
pageCats = new ArrayList<String>();
Pattern catPattern = Pattern.compile("\\[\\[Category:(.*?)\\]\\]", Pattern.MULTILINE);
Matcher matcher = catPattern.matcher(wikiText);
while (matcher.find()) {
String[] temp = matcher.group(1).split("\\|");
pageCats.add(temp[0]);
}
}
private void parseLinks() {
pageLinks = new ArrayList<String>();
Pattern catPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE);
Matcher matcher = catPattern.matcher(wikiText);
while (matcher.find()) {
String[] temp = matcher.group(1).split("\\|");
if (temp == null || temp.length == 0) continue;
String link = temp[0];
if (link.contains(":") == false) {
pageLinks.add(link);
}
}
}
public String getPlainText() {
String text = wikiText.replaceAll("&gt;", ">");
text = text.replaceAll("&lt;", "<");
text = text.replaceAll("<ref>.*?</ref>", " ");
text = text.replaceAll("</?.*?>", " ");
text = text.replaceAll("\\{\\{.*?\\}\\}", " ");
text = text.replaceAll("\\[\\[.*?:.*?\\]\\]", " ");
text = text.replaceAll("\\[\\[(.*?)\\]\\]", "$1");
text = text.replaceAll("\\s(.*?)\\|(\\w+\\s)", " $2");
text = text.replaceAll("\\[.*?\\]", " ");
text = text.replaceAll("\\'+", "");
return text;
}
public InfoBox getInfoBox() {
//parseInfoBox is expensive. Doing it only once like other parse* methods
if (infoBox == null)
infoBox = parseInfoBox();
return infoBox;
}
private InfoBox parseInfoBox() {
String INFOBOX_CONST_STR = "{{Infobox";
int startPos = wikiText.indexOf(INFOBOX_CONST_STR);
if (startPos < 0) return null;
int bracketCount = 2;
int endPos = startPos + INFOBOX_CONST_STR.length();
for (; endPos < wikiText.length(); endPos++) {
switch (wikiText.charAt(endPos)) {
case '}':
bracketCount--;
break;
case '{':
bracketCount++;
break;
default:
}
if (bracketCount == 0) break;
}
String infoBoxText = wikiText.substring(startPos, endPos + 1);
infoBoxText = stripCite(infoBoxText); // strip clumsy {{cite}} tags
// strip any html formatting
infoBoxText = infoBoxText.replaceAll("&gt;", ">");
infoBoxText = infoBoxText.replaceAll("&lt;", "<");
infoBoxText = infoBoxText.replaceAll("<ref.*?>.*?</ref>", " ");
infoBoxText = infoBoxText.replaceAll("</?.*?>", " ");
return new InfoBox(infoBoxText);
}
private String stripCite(String text) {
String CITE_CONST_STR = "{{cite";
int startPos = text.indexOf(CITE_CONST_STR);
if (startPos < 0) return text;
int bracketCount = 2;
int endPos = startPos + CITE_CONST_STR.length();
for (; endPos < text.length(); endPos++) {
switch (text.charAt(endPos)) {
case '}':
bracketCount--;
break;
case '{':
bracketCount++;
break;
default:
}
if (bracketCount == 0) break;
}
text = text.substring(0, startPos - 1) + text.substring(endPos);
return stripCite(text);
}
public boolean isDisambiguationPage() {
return disambiguation;
}
public String getTranslatedTitle(String languageCode) {
Pattern pattern = Pattern.compile("^\\[\\[" + languageCode + ":(.*?)\\]\\]$", Pattern.MULTILINE);
Matcher matcher = pattern.matcher(wikiText);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia.support;
import org.elasticsearch.common.compress.bzip2.CBZip2InputStream;
import org.xml.sax.InputSource;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.zip.GZIPInputStream;
/**
* @author Delip Rao
* @author Jason Smith
*/
public abstract class WikiXMLParser {
private URL wikiXMLFile = null;
protected WikiPage currentPage = null;
public WikiXMLParser(URL fileName) {
wikiXMLFile = fileName;
}
/**
* Set a callback handler. The callback is executed every time a
* page instance is detected in the stream. Custom handlers are
* implementations of {@link PageCallbackHandler}
*
* @param handler
* @throws Exception
*/
public abstract void setPageCallback(PageCallbackHandler handler) throws Exception;
/**
* The main parse method.
*
* @throws Exception
*/
public abstract void parse() throws Exception;
/**
* @return an iterator to the list of pages
* @throws Exception
*/
public abstract WikiPageIterator getIterator() throws Exception;
/**
* @return An InputSource created from wikiXMLFile
* @throws Exception
*/
protected InputSource getInputSource() throws Exception {
BufferedReader br = null;
if (wikiXMLFile.toExternalForm().endsWith(".gz")) {
br = new BufferedReader(new InputStreamReader(new GZIPInputStream(wikiXMLFile.openStream()), "UTF-8"));
} else if (wikiXMLFile.toExternalForm().endsWith(".bz2")) {
InputStream fis = wikiXMLFile.openStream();
byte[] ignoreBytes = new byte[2];
fis.read(ignoreBytes); //"B", "Z" bytes from commandline tools
br = new BufferedReader(new InputStreamReader(new CBZip2InputStream(fis), "UTF-8"));
} else {
br = new BufferedReader(new InputStreamReader(wikiXMLFile.openStream(), "UTF-8"));
}
return new InputSource(br);
}
protected void notifyPage(WikiPage page) {
currentPage = page;
}
}

View File

@ -0,0 +1,14 @@
package org.elasticsearch.river.wikipedia.support;
import java.net.URL;
/**
* @author Delip Rao
*/
public class WikiXMLParserFactory {
public static WikiXMLParser getSAXParser(URL fileName) {
return new WikiXMLSAXParser(fileName);
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.wikipedia.support;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.XMLReaderFactory;
import java.net.URL;
/**
* A SAX Parser for Wikipedia XML dumps.
*
* @author Jason Smith
*/
public class WikiXMLSAXParser extends WikiXMLParser {
private XMLReader xmlReader;
private PageCallbackHandler pageHandler = null;
public WikiXMLSAXParser(URL fileName) {
super(fileName);
try {
xmlReader = XMLReaderFactory.createXMLReader();
pageHandler = new IteratorHandler(this);
} catch (SAXException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Set a callback handler. The callback is executed every time a
* page instance is detected in the stream. Custom handlers are
* implementations of {@link PageCallbackHandler}
*
* @param handler
* @throws Exception
*/
public void setPageCallback(PageCallbackHandler handler) throws Exception {
pageHandler = handler;
}
/**
* The main parse method.
*
* @throws Exception
*/
public void parse() throws Exception {
xmlReader.setContentHandler(new SAXPageCallbackHandler(pageHandler));
xmlReader.parse(getInputSource());
}
/**
* This parser is event driven, so it
* can't provide a page iterator.
*/
@Override
public WikiPageIterator getIterator() throws Exception {
if (!(pageHandler instanceof IteratorHandler)) {
throw new Exception("Custom page callback found. Will not iterate.");
}
throw new UnsupportedOperationException();
}
/**
* A convenience method for the Wikipedia SAX interface
*
* @param dumpFile - path to the Wikipedia dump
* @param handler - callback handler used for parsing
* @throws Exception
*/
public static void parseWikipediaDump(URL dumpFile,
PageCallbackHandler handler) throws Exception {
WikiXMLParser wxsp = WikiXMLParserFactory.getSAXParser(dumpFile);
wxsp.setPageCallback(handler);
wxsp.parse();
}
}

View File

@ -0,0 +1,6 @@
/**
* Copied from wikixmlj on 2010-10-03.
*
* Changed from File handling to URL handling, and removed Dom parser.
*/
package org.elasticsearch.river.wikipedia.support;

View File

@ -21,6 +21,7 @@ include 'plugins-transport-memcached'
include 'plugins-transport-thrift'
include 'plugins-river-twitter'
include 'plugins-river-wikipedia'
include 'plugins-river-rabbitmq'
include 'plugins-river-couchdb'