From c4d17860a1e4ee48cb62ae2cd8a04e87f6f3dde4 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 3 Oct 2010 22:22:45 +0200 Subject: [PATCH] Wikipedia River: A river to index wikipedia, closes #403. --- .idea/dictionaries/kimchy.xml | 1 + .idea/modules.xml | 1 + .idea/modules/elasticsearch-root.iml | 1 + .idea/modules/plugin-river-wikipedia.iml | 18 ++ plugins/river/wikipedia/build.gradle | 136 ++++++++++ .../src/main/java/es-plugin.properties | 2 + .../river/wikipedia/WikipediaRiverPlugin.java | 40 +++ .../river/wikipedia/WikipediaRiver.java | 235 ++++++++++++++++++ .../river/wikipedia/WikipediaRiverModule.java | 33 +++ .../river/wikipedia/support/InfoBox.java | 37 +++ .../wikipedia/support/IteratorHandler.java | 34 +++ .../support/PageCallbackHandler.java | 26 ++ .../support/SAXPageCallbackHandler.java | 60 +++++ .../river/wikipedia/support/WikiPage.java | 150 +++++++++++ .../wikipedia/support/WikiPageIterator.java | 47 ++++ .../wikipedia/support/WikiTextParser.java | 196 +++++++++++++++ .../wikipedia/support/WikiXMLParser.java | 92 +++++++ .../support/WikiXMLParserFactory.java | 14 ++ .../wikipedia/support/WikiXMLSAXParser.java | 97 ++++++++ .../river/wikipedia/support/package-info.java | 6 + settings.gradle | 1 + 21 files changed, 1227 insertions(+) create mode 100644 .idea/modules/plugin-river-wikipedia.iml create mode 100644 plugins/river/wikipedia/build.gradle create mode 100644 plugins/river/wikipedia/src/main/java/es-plugin.properties create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/plugin/river/wikipedia/WikipediaRiverPlugin.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiver.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiverModule.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/InfoBox.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/IteratorHandler.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/PageCallbackHandler.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/SAXPageCallbackHandler.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPage.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPageIterator.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiTextParser.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParser.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParserFactory.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLSAXParser.java create mode 100644 plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/package-info.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index a14704b29d4..a3b805e4f1c 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -148,6 +148,7 @@ uuid versioned warmup + wikipedia wildcards xcontent xson diff --git a/.idea/modules.xml b/.idea/modules.xml index 35a9784378c..9f45e9cbc74 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -14,6 +14,7 @@ + diff --git a/.idea/modules/elasticsearch-root.iml b/.idea/modules/elasticsearch-root.iml index fed9ddd6100..d2ff8c25d3d 100644 --- a/.idea/modules/elasticsearch-root.iml +++ b/.idea/modules/elasticsearch-root.iml @@ -24,6 +24,7 @@ + diff --git a/.idea/modules/plugin-river-wikipedia.iml b/.idea/modules/plugin-river-wikipedia.iml new file mode 100644 index 00000000000..af1297c3ddb --- /dev/null +++ b/.idea/modules/plugin-river-wikipedia.iml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + diff --git a/plugins/river/wikipedia/build.gradle b/plugins/river/wikipedia/build.gradle new file mode 100644 index 00000000000..c09c57a5795 --- /dev/null +++ b/plugins/river/wikipedia/build.gradle @@ -0,0 +1,136 @@ +dependsOn(':elasticsearch') + +apply plugin: 'java' +apply plugin: 'maven' + +archivesBaseName = "elasticsearch-river-wikipedia" + +explodedDistDir = new File(distsDir, 'exploded') + +manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::River::Wikipedia", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr) + +configurations.compile.transitive = true +configurations.testCompile.transitive = true + +// no need to use the resource dir +sourceSets.main.resources.srcDirs 'src/main/java' +sourceSets.test.resources.srcDirs 'src/test/java' + +// add the source files to the dist jar +//jar { +// from sourceSets.main.allJava +//} + +configurations { + dists + distLib { + visible = false + transitive = false + } +} + +dependencies { + compile project(':elasticsearch') + + testCompile project(':test-testng') + testCompile('org.testng:testng:5.10:jdk15') { transitive = false } + testCompile 'org.hamcrest:hamcrest-all:1.1' +} + +test { + useTestNG() + jmvArgs = ["-ea", "-Xmx1024m"] + suiteName = project.name + listeners = ["org.elasticsearch.util.testng.Listeners"] + systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties") +} + +task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << { + [explodedDistDir]*.mkdirs() + + copy { + from configurations.distLib + into explodedDistDir + } + + // remove elasticsearch files (compile above adds the elasticsearch one) + ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") } + + copy { + from libsDir + into explodedDistDir + } + + ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") } + ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") } +} + +task zip(type: Zip, dependsOn: ['explodedDist']) { + from(explodedDistDir) { + } +} + +task release(dependsOn: [zip]) << { + ant.delete(dir: explodedDistDir) + copy { + from distsDir + into(new File(rootProject.distsDir, "plugins")) + } +} + +configurations { + deployerJars +} + +dependencies { + deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2" +} + +task sourcesJar(type: Jar, dependsOn: classes) { + classifier = 'sources' + from sourceSets.main.allSource +} + +task javadocJar(type: Jar, dependsOn: javadoc) { + classifier = 'javadoc' + from javadoc.destinationDir +} + +artifacts { + archives sourcesJar + archives javadocJar +} + +uploadArchives { + repositories.mavenDeployer { + configuration = configurations.deployerJars + repository(url: rootProject.mavenRepoUrl) { + authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass) + } + snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) { + authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass) + } + + pom.project { + inceptionYear '2009' + name 'elasticsearch-plugins-river-wikipedia' + description 'Attachments Plugin for ElasticSearch' + licenses { + license { + name 'The Apache Software License, Version 2.0' + url 'http://www.apache.org/licenses/LICENSE-2.0.txt' + distribution 'repo' + } + } + scm { + connection 'git://github.com/elasticsearch/elasticsearch.git' + developerConnection 'git@github.com:elasticsearch/elasticsearch.git' + url 'http://github.com/elasticsearch/elasticsearch' + } + } + + pom.whenConfigured {pom -> + pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones + } + } +} \ No newline at end of file diff --git a/plugins/river/wikipedia/src/main/java/es-plugin.properties b/plugins/river/wikipedia/src/main/java/es-plugin.properties new file mode 100644 index 00000000000..38b8df1dae5 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/es-plugin.properties @@ -0,0 +1,2 @@ +plugin=org.elasticsearch.plugin.river.wikipedia.WikipediaRiverPlugin + diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/plugin/river/wikipedia/WikipediaRiverPlugin.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/plugin/river/wikipedia/WikipediaRiverPlugin.java new file mode 100644 index 00000000000..7ade4428888 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/plugin/river/wikipedia/WikipediaRiverPlugin.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.river.wikipedia; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.plugins.AbstractPlugin; + +/** + * @author kimchy (shay.banon) + */ +public class WikipediaRiverPlugin extends AbstractPlugin { + + @Inject public WikipediaRiverPlugin() { + } + + @Override public String name() { + return "river-wikipedia"; + } + + @Override public String description() { + return "River Wikipedia Plugin"; + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiver.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiver.java new file mode 100644 index 00000000000..de3dfa22eff --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiver.java @@ -0,0 +1,235 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.river.AbstractRiverComponent; +import org.elasticsearch.river.River; +import org.elasticsearch.river.RiverName; +import org.elasticsearch.river.RiverSettings; +import org.elasticsearch.river.wikipedia.support.PageCallbackHandler; +import org.elasticsearch.river.wikipedia.support.WikiPage; +import org.elasticsearch.river.wikipedia.support.WikiXMLParser; +import org.elasticsearch.river.wikipedia.support.WikiXMLParserFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author kimchy (shay.banon) + */ +public class WikipediaRiver extends AbstractRiverComponent implements River { + + private StringBuilder sb = new StringBuilder(); + + private final Client client; + + private final URL url; + + private final String indexName; + + private final String typeName; + + private final int bulkSize; + + private final int dropThreshold; + + + private final AtomicInteger onGoingBulks = new AtomicInteger(); + + private volatile Thread thread; + + private volatile boolean closed = false; + + private volatile BulkRequestBuilder currentRequest; + + @Inject public WikipediaRiver(RiverName riverName, RiverSettings settings, Client client) throws MalformedURLException { + super(riverName, settings); + this.client = client; + + String url = "http://download.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2"; + if (settings.settings().containsKey("wikipedia")) { + Map wikipediaSettings = (Map) settings.settings().get("wikipedia"); + url = XContentMapValues.nodeStringValue(wikipediaSettings.get("url"), url); + } + + logger.info("creating wikipedia stream river for [{}]", url); + this.url = new URL(url); + + if (settings.settings().containsKey("index")) { + Map indexSettings = (Map) settings.settings().get("index"); + indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name()); + typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "status"); + this.bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100); + this.dropThreshold = XContentMapValues.nodeIntegerValue(settings.settings().get("drop_threshold"), 10); + } else { + indexName = riverName.name(); + typeName = "page"; + bulkSize = 100; + dropThreshold = 10; + } + } + + @Override public void start() { + logger.info("starting twitter stream"); + try { + client.admin().indices().prepareCreate(indexName).execute().actionGet(); + } catch (Exception e) { + if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { + // that's fine + } else if (ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException) { + // ok, not recovered yet..., lets start indexing and hope we recover by the first bulk + // TODO: a smarter logic can be to register for cluster event listener here, and only start sampling when the block is removed... + } else { + logger.warn("failed to create index [{}], disabling river...", e, indexName); + return; + } + } + currentRequest = client.prepareBulk(); + WikiXMLParser parser = WikiXMLParserFactory.getSAXParser(url); + try { + parser.setPageCallback(new PageCallback()); + } catch (Exception e) { + logger.error("failed to create parser", e); + return; + } + thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "wikipedia_slurper").newThread(new Parser(parser)); + thread.start(); + } + + @Override public void close() { + logger.info("closing wikipedia river"); + closed = true; + if (thread != null) { + thread.interrupt(); + } + } + + private class Parser implements Runnable { + private final WikiXMLParser parser; + + private Parser(WikiXMLParser parser) { + this.parser = parser; + } + + @Override public void run() { + try { + parser.parse(); + } catch (Exception e) { + if (closed) { + return; + } + logger.error("failed to parse stream", e); + } + } + } + + private class PageCallback implements PageCallbackHandler { + + @Override public void process(WikiPage page) { + if (closed) { + return; + } + String title = stripTitle(page.getTitle()); + if (logger.isTraceEnabled()) { + logger.trace("page {} : {}", page.getID(), page.getTitle()); + } + try { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + builder.field("title", title); + builder.field("text", page.getText()); + builder.field("redirect", page.isRedirect()); + builder.field("special", page.isSpecialPage()); + builder.field("stub", page.isStub()); + builder.field("disambiguation", page.isDisambiguationPage()); + + builder.startArray("category"); + for (String s : page.getCategories()) { + builder.value(s); + } + builder.endArray(); + + builder.startArray("link"); + for (String s : page.getLinks()) { + builder.value(s); + } + builder.endArray(); + + builder.endObject(); + // For now, we index (and not create) since we need to keep track of what we indexed... + currentRequest.add(Requests.indexRequest(indexName).type(typeName).id(page.getID()).create(false).source(builder)); + processBulkIfNeeded(); + } catch (Exception e) { + logger.warn("failed to construct index request", e); + } + } + + private void processBulkIfNeeded() { + if (currentRequest.numberOfActions() >= bulkSize) { + // execute the bulk operation + int currentOnGoingBulks = onGoingBulks.incrementAndGet(); + if (currentOnGoingBulks > dropThreshold) { + // TODO, just wait here!, we can slow down the wikipedia parsing + onGoingBulks.decrementAndGet(); + logger.warn("dropping bulk, [{}] crossed threshold [{}]", onGoingBulks, dropThreshold); + } else { + try { + currentRequest.execute(new ActionListener() { + @Override public void onResponse(BulkResponse bulkResponse) { + onGoingBulks.decrementAndGet(); + } + + @Override public void onFailure(Throwable e) { + logger.warn("failed to execute bulk"); + } + }); + } catch (Exception e) { + logger.warn("failed to process bulk", e); + } + } + currentRequest = client.prepareBulk(); + } + } + } + + + private String stripTitle(String title) { + sb.setLength(0); + sb.append(title); + while (sb.length() > 0 && (sb.charAt(sb.length() - 1) == '\n' || (sb.charAt(sb.length() - 1) == ' '))) { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiverModule.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiverModule.java new file mode 100644 index 00000000000..e430908dc91 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/WikipediaRiverModule.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia; + +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.river.River; + +/** + * @author kimchy (shay.banon) + */ +public class WikipediaRiverModule extends AbstractModule { + + @Override protected void configure() { + bind(River.class).to(WikipediaRiver.class).asEagerSingleton(); + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/InfoBox.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/InfoBox.java new file mode 100644 index 00000000000..38e9902a206 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/InfoBox.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia.support; + +/** + * A class abstracting Wiki infobox + * + * @author Delip Rao + */ +public class InfoBox { + String infoBoxWikiText = null; + + InfoBox(String infoBoxWikiText) { + this.infoBoxWikiText = infoBoxWikiText; + } + + public String dumpRaw() { + return infoBoxWikiText; + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/IteratorHandler.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/IteratorHandler.java new file mode 100644 index 00000000000..d9b320d967a --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/IteratorHandler.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia.support; + +public class IteratorHandler implements PageCallbackHandler { + + private WikiXMLParser parser = null; + + public IteratorHandler(WikiXMLParser myParser) { + parser = myParser; + } + + public void process(WikiPage page) { + parser.notifyPage(page); + } + +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/PageCallbackHandler.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/PageCallbackHandler.java new file mode 100644 index 00000000000..2f6b2a640a2 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/PageCallbackHandler.java @@ -0,0 +1,26 @@ +package org.elasticsearch.river.wikipedia.support; + +/** + * Interface to allow streamed processing of pages. + * This allows a SAX style processing of Wikipedia XML files. + * The registered callback is executed on each page + * element in the XML file. + *

+ * Using callbacks will consume lesser memory, an useful feature for large + * dumps like English and German. + * + * @author Delip Rao + * @see WikiXMLDOMParser + * @see WikiPage + */ + +public interface PageCallbackHandler { + /** + * This is the callback method that should be implemented before + * registering with WikiXMLDOMParser + * + * @param page a wikipedia page object + * @see WikiPage + */ + public void process(WikiPage page); +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/SAXPageCallbackHandler.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/SAXPageCallbackHandler.java new file mode 100644 index 00000000000..263d362ca9a --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/SAXPageCallbackHandler.java @@ -0,0 +1,60 @@ +package org.elasticsearch.river.wikipedia.support; + +import org.xml.sax.Attributes; +import org.xml.sax.helpers.DefaultHandler; + +/** + * A Wrapper class for the PageCallbackHandler + * + * @author Jason Smith + */ +public class SAXPageCallbackHandler extends DefaultHandler { + + private PageCallbackHandler pageHandler; + private WikiPage currentPage; + private String currentTag; + + private String currentWikitext; + private String currentTitle; + private String currentID; + + public SAXPageCallbackHandler(PageCallbackHandler ph) { + pageHandler = ph; + } + + public void startElement(String uri, String name, String qName, Attributes attr) { + currentTag = qName; + if (qName.equals("page")) { + currentPage = new WikiPage(); + currentWikitext = ""; + currentTitle = ""; + currentID = ""; + } + } + + public void endElement(String uri, String name, String qName) { + if (qName.equals("page")) { + currentPage.setTitle(currentTitle); + currentPage.setID(currentID); + currentPage.setWikiText(currentWikitext); + pageHandler.process(currentPage); + } + if (qName.equals("mediawiki")) { + // TODO hasMoreElements() should now return false + } + } + + public void characters(char ch[], int start, int length) { + if (currentTag.equals("title")) { + currentTitle = currentTitle.concat(new String(ch, start, length)); + } + // TODO: To avoid looking at the revision ID, only the first ID is taken. + // I'm not sure how big the block size is in each call to characters(), + // so this may be unsafe. + else if ((currentTag.equals("id")) && (currentID.length() == 0)) { + currentID = new String(ch, start, length); + } else if (currentTag.equals("text")) { + currentWikitext = currentWikitext.concat(new String(ch, start, length)); + } + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPage.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPage.java new file mode 100644 index 00000000000..4943bd7c278 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPage.java @@ -0,0 +1,150 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia.support; + +import java.util.List; + +/** + * Data structures for a wikipedia page. + * + * @author Delip Rao + */ +public class WikiPage { + + private String title = null; + private WikiTextParser wikiTextParser = null; + private String id = null; + + /** + * Set the page title. This is not intended for direct use. + * + * @param title + */ + public void setTitle(String title) { + this.title = title; + } + + /** + * Set the wiki text associated with this page. + * This setter also introduces side effects. This is not intended for direct use. + * + * @param wtext wiki-formatted text + */ + public void setWikiText(String wtext) { + wikiTextParser = new WikiTextParser(wtext); + } + + /** + * @return a string containing the page title. + */ + public String getTitle() { + return title; + } + + /** + * @param languageCode + * @return a string containing the title translated + * in the given languageCode. + */ + public String getTranslatedTitle(String languageCode) { + return wikiTextParser.getTranslatedTitle(languageCode); + } + + /** + * @return true if this a disambiguation page. + */ + public boolean isDisambiguationPage() { + if (title.contains("(disambiguation)") || + wikiTextParser.isDisambiguationPage()) + return true; + else return false; + } + + /** + * @return true for "special pages" -- like Category:, Wikipedia:, etc + */ + public boolean isSpecialPage() { + return title.contains(":"); + } + + /** + * Use this method to get the wiki text associated with this page. + * Useful for custom processing the wiki text. + * + * @return a string containing the wiki text. + */ + public String getWikiText() { + return wikiTextParser.getText(); + } + + /** + * @return true if this is a redirection page + */ + public boolean isRedirect() { + return wikiTextParser.isRedirect(); + } + + /** + * @return true if this is a stub page + */ + public boolean isStub() { + return wikiTextParser.isStub(); + } + + /** + * @return the title of the page being redirected to. + */ + public String getRedirectPage() { + return wikiTextParser.getRedirectText(); + } + + /** + * @return plain text stripped of all wiki formatting. + */ + public String getText() { + return wikiTextParser.getPlainText(); + } + + /** + * @return a list of categories the page belongs to, null if this a redirection/disambiguation page + */ + public List getCategories() { + return wikiTextParser.getCategories(); + } + + /** + * @return a list of links contained in the page + */ + public List getLinks() { + return wikiTextParser.getLinks(); + } + + public void setID(String id) { + this.id = id; + } + + public InfoBox getInfoBox() { + return wikiTextParser.getInfoBox(); + } + + public String getID() { + return id; + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPageIterator.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPageIterator.java new file mode 100644 index 00000000000..0a793d5c45d --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiPageIterator.java @@ -0,0 +1,47 @@ +package org.elasticsearch.river.wikipedia.support; + +import java.util.Vector; + +/** + * A class to iterate the pages after the wikipedia XML file has been parsed with {@link WikiXMLDOMParser}. + * + * @author Delip Rao + * @see WikiXMLDOMParser + */ +public class WikiPageIterator { + + private int currentPage = 0; + private int lastPage = 0; + Vector pageList = null; + + public WikiPageIterator(Vector list) { + pageList = list; + if (pageList != null) + lastPage = pageList.size(); + } + + /** + * @return true if there are more pages to be read + */ + public boolean hasMorePages() { + return (currentPage < lastPage); + } + + /** + * Reset the iterator. + */ + public void reset() { + currentPage = 0; + } + + /** + * Advances the iterator by one position. + * + * @return a {@link WikiPage} + */ + public WikiPage nextPage() { + if (hasMorePages()) + return pageList.elementAt(currentPage++); + return null; + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiTextParser.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiTextParser.java new file mode 100644 index 00000000000..0459b936838 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiTextParser.java @@ -0,0 +1,196 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia.support; + +import java.util.ArrayList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * For internal use only -- Used by the {@link WikiPage} class. + * Can also be used as a stand alone class to parse wiki formatted text. + * + * @author Delip Rao + */ +public class WikiTextParser { + + private String wikiText = null; + private ArrayList pageCats = null; + private ArrayList pageLinks = null; + private boolean redirect = false; + private String redirectString = null; + private static Pattern redirectPattern = + Pattern.compile("#REDIRECT\\s+\\[\\[(.*?)\\]\\]"); + private boolean stub = false; + private boolean disambiguation = false; + private static Pattern stubPattern = Pattern.compile("\\-stub\\}\\}"); + private static Pattern disambCatPattern = Pattern.compile("\\{\\{disambig\\}\\}"); + private InfoBox infoBox = null; + + public WikiTextParser(String wtext) { + wikiText = wtext; + Matcher matcher = redirectPattern.matcher(wikiText); + if (matcher.find()) { + redirect = true; + if (matcher.groupCount() == 1) + redirectString = matcher.group(1); + } + matcher = stubPattern.matcher(wikiText); + stub = matcher.find(); + matcher = disambCatPattern.matcher(wikiText); + disambiguation = matcher.find(); + } + + public boolean isRedirect() { + return redirect; + } + + public boolean isStub() { + return stub; + } + + public String getRedirectText() { + return redirectString; + } + + public String getText() { + return wikiText; + } + + public ArrayList getCategories() { + if (pageCats == null) parseCategories(); + return pageCats; + } + + public ArrayList getLinks() { + if (pageLinks == null) parseLinks(); + return pageLinks; + } + + private void parseCategories() { + pageCats = new ArrayList(); + Pattern catPattern = Pattern.compile("\\[\\[Category:(.*?)\\]\\]", Pattern.MULTILINE); + Matcher matcher = catPattern.matcher(wikiText); + while (matcher.find()) { + String[] temp = matcher.group(1).split("\\|"); + pageCats.add(temp[0]); + } + } + + private void parseLinks() { + pageLinks = new ArrayList(); + + Pattern catPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE); + Matcher matcher = catPattern.matcher(wikiText); + while (matcher.find()) { + String[] temp = matcher.group(1).split("\\|"); + if (temp == null || temp.length == 0) continue; + String link = temp[0]; + if (link.contains(":") == false) { + pageLinks.add(link); + } + } + } + + public String getPlainText() { + String text = wikiText.replaceAll(">", ">"); + text = text.replaceAll("<", "<"); + text = text.replaceAll(".*?", " "); + text = text.replaceAll("", " "); + text = text.replaceAll("\\{\\{.*?\\}\\}", " "); + text = text.replaceAll("\\[\\[.*?:.*?\\]\\]", " "); + text = text.replaceAll("\\[\\[(.*?)\\]\\]", "$1"); + text = text.replaceAll("\\s(.*?)\\|(\\w+\\s)", " $2"); + text = text.replaceAll("\\[.*?\\]", " "); + text = text.replaceAll("\\'+", ""); + return text; + } + + public InfoBox getInfoBox() { + //parseInfoBox is expensive. Doing it only once like other parse* methods + if (infoBox == null) + infoBox = parseInfoBox(); + return infoBox; + } + + private InfoBox parseInfoBox() { + String INFOBOX_CONST_STR = "{{Infobox"; + int startPos = wikiText.indexOf(INFOBOX_CONST_STR); + if (startPos < 0) return null; + int bracketCount = 2; + int endPos = startPos + INFOBOX_CONST_STR.length(); + for (; endPos < wikiText.length(); endPos++) { + switch (wikiText.charAt(endPos)) { + case '}': + bracketCount--; + break; + case '{': + bracketCount++; + break; + default: + } + if (bracketCount == 0) break; + } + String infoBoxText = wikiText.substring(startPos, endPos + 1); + infoBoxText = stripCite(infoBoxText); // strip clumsy {{cite}} tags + // strip any html formatting + infoBoxText = infoBoxText.replaceAll(">", ">"); + infoBoxText = infoBoxText.replaceAll("<", "<"); + infoBoxText = infoBoxText.replaceAll(".*?", " "); + infoBoxText = infoBoxText.replaceAll("", " "); + return new InfoBox(infoBoxText); + } + + private String stripCite(String text) { + String CITE_CONST_STR = "{{cite"; + int startPos = text.indexOf(CITE_CONST_STR); + if (startPos < 0) return text; + int bracketCount = 2; + int endPos = startPos + CITE_CONST_STR.length(); + for (; endPos < text.length(); endPos++) { + switch (text.charAt(endPos)) { + case '}': + bracketCount--; + break; + case '{': + bracketCount++; + break; + default: + } + if (bracketCount == 0) break; + } + text = text.substring(0, startPos - 1) + text.substring(endPos); + return stripCite(text); + } + + public boolean isDisambiguationPage() { + return disambiguation; + } + + public String getTranslatedTitle(String languageCode) { + Pattern pattern = Pattern.compile("^\\[\\[" + languageCode + ":(.*?)\\]\\]$", Pattern.MULTILINE); + Matcher matcher = pattern.matcher(wikiText); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } + +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParser.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParser.java new file mode 100644 index 00000000000..253a0fc3dd2 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParser.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia.support; + +import org.elasticsearch.common.compress.bzip2.CBZip2InputStream; +import org.xml.sax.InputSource; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.zip.GZIPInputStream; + +/** + * @author Delip Rao + * @author Jason Smith + */ +public abstract class WikiXMLParser { + + private URL wikiXMLFile = null; + protected WikiPage currentPage = null; + + public WikiXMLParser(URL fileName) { + wikiXMLFile = fileName; + } + + /** + * Set a callback handler. The callback is executed every time a + * page instance is detected in the stream. Custom handlers are + * implementations of {@link PageCallbackHandler} + * + * @param handler + * @throws Exception + */ + public abstract void setPageCallback(PageCallbackHandler handler) throws Exception; + + /** + * The main parse method. + * + * @throws Exception + */ + public abstract void parse() throws Exception; + + /** + * @return an iterator to the list of pages + * @throws Exception + */ + public abstract WikiPageIterator getIterator() throws Exception; + + /** + * @return An InputSource created from wikiXMLFile + * @throws Exception + */ + protected InputSource getInputSource() throws Exception { + BufferedReader br = null; + + if (wikiXMLFile.toExternalForm().endsWith(".gz")) { + br = new BufferedReader(new InputStreamReader(new GZIPInputStream(wikiXMLFile.openStream()), "UTF-8")); + } else if (wikiXMLFile.toExternalForm().endsWith(".bz2")) { + InputStream fis = wikiXMLFile.openStream(); + byte[] ignoreBytes = new byte[2]; + fis.read(ignoreBytes); //"B", "Z" bytes from commandline tools + br = new BufferedReader(new InputStreamReader(new CBZip2InputStream(fis), "UTF-8")); + } else { + br = new BufferedReader(new InputStreamReader(wikiXMLFile.openStream(), "UTF-8")); + } + + return new InputSource(br); + } + + protected void notifyPage(WikiPage page) { + currentPage = page; + + } +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParserFactory.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParserFactory.java new file mode 100644 index 00000000000..a7aeb60156c --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLParserFactory.java @@ -0,0 +1,14 @@ +package org.elasticsearch.river.wikipedia.support; + +import java.net.URL; + +/** + * @author Delip Rao + */ +public class WikiXMLParserFactory { + + public static WikiXMLParser getSAXParser(URL fileName) { + return new WikiXMLSAXParser(fileName); + } + +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLSAXParser.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLSAXParser.java new file mode 100644 index 00000000000..3f64715f466 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/WikiXMLSAXParser.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.wikipedia.support; + +import org.xml.sax.SAXException; +import org.xml.sax.XMLReader; +import org.xml.sax.helpers.XMLReaderFactory; + +import java.net.URL; + +/** + * A SAX Parser for Wikipedia XML dumps. + * + * @author Jason Smith + */ +public class WikiXMLSAXParser extends WikiXMLParser { + + private XMLReader xmlReader; + private PageCallbackHandler pageHandler = null; + + public WikiXMLSAXParser(URL fileName) { + super(fileName); + try { + xmlReader = XMLReaderFactory.createXMLReader(); + pageHandler = new IteratorHandler(this); + } catch (SAXException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /** + * Set a callback handler. The callback is executed every time a + * page instance is detected in the stream. Custom handlers are + * implementations of {@link PageCallbackHandler} + * + * @param handler + * @throws Exception + */ + public void setPageCallback(PageCallbackHandler handler) throws Exception { + pageHandler = handler; + } + + /** + * The main parse method. + * + * @throws Exception + */ + public void parse() throws Exception { + xmlReader.setContentHandler(new SAXPageCallbackHandler(pageHandler)); + xmlReader.parse(getInputSource()); + } + + /** + * This parser is event driven, so it + * can't provide a page iterator. + */ + @Override + public WikiPageIterator getIterator() throws Exception { + if (!(pageHandler instanceof IteratorHandler)) { + throw new Exception("Custom page callback found. Will not iterate."); + } + throw new UnsupportedOperationException(); + } + + /** + * A convenience method for the Wikipedia SAX interface + * + * @param dumpFile - path to the Wikipedia dump + * @param handler - callback handler used for parsing + * @throws Exception + */ + public static void parseWikipediaDump(URL dumpFile, + PageCallbackHandler handler) throws Exception { + WikiXMLParser wxsp = WikiXMLParserFactory.getSAXParser(dumpFile); + wxsp.setPageCallback(handler); + wxsp.parse(); + } + +} diff --git a/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/package-info.java b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/package-info.java new file mode 100644 index 00000000000..9f41cbee375 --- /dev/null +++ b/plugins/river/wikipedia/src/main/java/org/elasticsearch/river/wikipedia/support/package-info.java @@ -0,0 +1,6 @@ +/** + * Copied from wikixmlj on 2010-10-03. + * + * Changed from File handling to URL handling, and removed Dom parser. + */ +package org.elasticsearch.river.wikipedia.support; \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index d122a1c63c6..a0461387349 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,6 +21,7 @@ include 'plugins-transport-memcached' include 'plugins-transport-thrift' include 'plugins-river-twitter' +include 'plugins-river-wikipedia' include 'plugins-river-rabbitmq' include 'plugins-river-couchdb'