River CouchDB Plugin, closes #382.

This commit is contained in:
kimchy 2010-09-23 02:26:12 +02:00
parent a59912db02
commit ecaaeb5250
20 changed files with 719 additions and 34 deletions

View File

@ -32,6 +32,7 @@
<w>conf</w>
<w>configurator</w>
<w>coord</w>
<w>couchdb</w>
<w>cpus</w>
<w>cstr</w>
<w>datagram</w>
@ -117,6 +118,7 @@
<w>segs</w>
<w>serializers</w>
<w>sigar</w>
<w>slurper</w>
<w>snapshotting</w>
<w>solaris</w>
<w>startup</w>

View File

@ -9,6 +9,7 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-river-couchdb.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-river-couchdb.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-river-rabbitmq.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-river-rabbitmq.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" />

View File

@ -21,6 +21,8 @@
<orderEntry type="module" module-name="plugins-hadoop" />
<orderEntry type="module" module-name="plugin-cloud-aws" />
<orderEntry type="module" module-name="plugin-river-twitter" />
<orderEntry type="module" module-name="plugin-river-couchdb" />
<orderEntry type="module" module-name="plugin-river-rabbitmq" />
<orderEntry type="module" module-name="test-integration" />
</component>
</module>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/river/couchdb/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/river/couchdb/build/classes/test" />
<exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/river/couchdb">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/river/couchdb/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../plugins/river/couchdb/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/../../plugins/river/couchdb/build" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="module" module-name="test-testng" scope="TEST" />
<orderEntry type="library" scope="TEST" name="testng" level="project" />
<orderEntry type="library" scope="TEST" name="hamcrest" level="project" />
</component>
</module>

View File

@ -92,6 +92,8 @@ public interface XContentParser {
Map<String, Object> map() throws IOException;
Map<String, Object> mapAndClose() throws IOException;
String text() throws IOException;
String textOrNull() throws IOException;

View File

@ -102,4 +102,12 @@ public abstract class AbstractXContentParser implements XContentParser {
@Override public Map<String, Object> map() throws IOException {
return XContentMapConverter.readMap(this);
}
@Override public Map<String, Object> mapAndClose() throws IOException {
try {
return map();
} finally {
close();
}
}
}

View File

@ -87,6 +87,13 @@ public class XContentMapValues {
return Long.parseLong(node.toString());
}
public static boolean nodeBooleanValue(Object node, boolean defaulValue) {
if (node == null) {
return defaulValue;
}
return nodeBooleanValue(node);
}
public static boolean nodeBooleanValue(Object node) {
if (node instanceof Boolean) {
return (Boolean) node;

View File

@ -0,0 +1,136 @@
dependsOn(':elasticsearch')
apply plugin: 'java'
apply plugin: 'maven'
archivesBaseName = "elasticsearch-river-couchdb"
explodedDistDir = new File(distsDir, 'exploded')
manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::River::CouchDB", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
configurations.compile.transitive = true
configurations.testCompile.transitive = true
// no need to use the resource dir
sourceSets.main.resources.srcDirs 'src/main/java'
sourceSets.test.resources.srcDirs 'src/test/java'
// add the source files to the dist jar
//jar {
// from sourceSets.main.allJava
//}
configurations {
dists
distLib {
visible = false
transitive = false
}
}
dependencies {
compile project(':elasticsearch')
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
testCompile 'org.hamcrest:hamcrest-all:1.1'
}
test {
useTestNG()
jmvArgs = ["-ea", "-Xmx1024m"]
suiteName = project.name
listeners = ["org.elasticsearch.util.testng.Listeners"]
systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties")
}
task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << {
[explodedDistDir]*.mkdirs()
copy {
from configurations.distLib
into explodedDistDir
}
// remove elasticsearch files (compile above adds the elasticsearch one)
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") }
copy {
from libsDir
into explodedDistDir
}
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") }
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") }
}
task zip(type: Zip, dependsOn: ['explodedDist']) {
from(explodedDistDir) {
}
}
task release(dependsOn: [zip]) << {
ant.delete(dir: explodedDistDir)
copy {
from distsDir
into(new File(rootProject.distsDir, "plugins"))
}
}
configurations {
deployerJars
}
dependencies {
deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2"
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}
artifacts {
archives sourcesJar
archives javadocJar
}
uploadArchives {
repositories.mavenDeployer {
configuration = configurations.deployerJars
repository(url: rootProject.mavenRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
pom.project {
inceptionYear '2009'
name 'elasticsearch-plugins-river-couchdb'
description 'Attachments Plugin for ElasticSearch'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
scm {
connection 'git://github.com/elasticsearch/elasticsearch.git'
developerConnection 'git@github.com:elasticsearch/elasticsearch.git'
url 'http://github.com/elasticsearch/elasticsearch'
}
}
pom.whenConfigured {pom ->
pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones
}
}
}

View File

@ -0,0 +1,2 @@
plugin=org.elasticsearch.plugin.river.couchdb.CouchdbRiverPlugin

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.river.couchdb;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugins.AbstractPlugin;
/**
* @author kimchy (shay.banon)
*/
public class CouchdbRiverPlugin extends AbstractPlugin {
@Inject public CouchdbRiverPlugin() {
}
@Override public String name() {
return "river-couchdb";
}
@Override public String description() {
return "River CouchDB Plugin";
}
}

View File

@ -0,0 +1,316 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.couchdb;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.util.concurrent.jsr166y.TransferQueue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
/**
* @author kimchy (shay.banon)
*/
public class CouchdbRiver extends AbstractRiverComponent implements River {
private final Client client;
private final String riverIndexName;
private final String couchHost;
private final int couchPort;
private final String couchDb;
private final String couchFilter;
private final String indexName;
private final String typeName;
private final int bulkSize;
private final TimeValue bulkTimeout;
private volatile Thread slurperThread;
private volatile Thread indexerThread;
private volatile boolean closed;
private final TransferQueue<String> stream = new LinkedTransferQueue<String>();
@Inject public CouchdbRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client) {
super(riverName, settings);
this.riverIndexName = riverIndexName;
this.client = client;
if (settings.settings().containsKey("couchdb")) {
Map<String, Object> couchSettings = (Map<String, Object>) settings.settings().get("couchdb");
couchHost = XContentMapValues.nodeStringValue(couchSettings.get("host"), "localhost");
couchPort = XContentMapValues.nodeIntegerValue(couchSettings.get("port"), 5984);
couchDb = XContentMapValues.nodeStringValue(couchSettings.get("db"), riverName.name());
couchFilter = XContentMapValues.nodeStringValue(couchSettings.get("filter"), null);
} else {
couchHost = "localhost";
couchPort = 5984;
couchDb = "db";
couchFilter = null;
}
if (settings.settings().containsKey("index")) {
Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), couchDb);
typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), couchDb);
bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_size"), 100);
if (indexSettings.containsKey("bulk_timeout")) {
bulkTimeout = TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(indexSettings.get("bulk_timeout"), "10ms"), TimeValue.timeValueMillis(10));
} else {
bulkTimeout = TimeValue.timeValueMillis(10);
}
} else {
indexName = couchDb;
typeName = couchDb;
bulkSize = 100;
bulkTimeout = TimeValue.timeValueMillis(10);
}
}
@Override public void start() {
logger.info("starting couchdb stream: host [{}], port [{}], filter [{}], db [{}], indexing to [{}]/[{}]", couchHost, couchPort, couchFilter, couchDb, indexName, typeName);
try {
client.admin().indices().prepareCreate(indexName).execute().actionGet();
} catch (Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// that's fine
} else if (ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException) {
// ok, not recovered yet..., lets start indexing and hope we recover by the first bulk
// TODO: a smarter logic can be to register for cluster event listener here, and only start sampling when the block is removed...
} else {
logger.warn("failed to create index [{}], disabling river...", e, indexName);
return;
}
}
slurperThread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "couchdb_river_slurper").newThread(new Slurper());
indexerThread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "couchdb_river_indexer").newThread(new Indexer());
indexerThread.start();
slurperThread.start();
}
@Override public void close() {
if (closed) {
return;
}
logger.info("closing couchdb stream river");
slurperThread.interrupt();
indexerThread.interrupt();
closed = true;
}
private String processLine(String s, BulkRequestBuilder bulk) {
Map<String, Object> map = null;
try {
map = XContentFactory.xContent(XContentType.JSON).createParser(s).mapAndClose();
} catch (IOException e) {
logger.warn("failed to parse {}", e, s);
return null;
}
if (map.containsKey("error")) {
logger.warn("received error {}", s);
return null;
}
String seq = map.get("seq").toString();
String id = map.get("id").toString();
if (map.containsKey("delete") && map.get("deleted").equals("true")) {
bulk.add(deleteRequest(indexName).type(typeName).id(id));
} else if (map.containsKey("doc")) {
Map<String, Object> doc = (Map<String, Object>) map.get("doc");
bulk.add(indexRequest(indexName).type(typeName).id(id).source(doc));
} else {
logger.warn("ignoring unknown change {}", s);
}
return seq;
}
private class Indexer implements Runnable {
@Override public void run() {
while (true) {
if (closed) {
return;
}
String s = null;
try {
s = stream.take();
} catch (InterruptedException e) {
if (closed) {
return;
}
continue;
}
BulkRequestBuilder bulk = client.prepareBulk();
String lastSeq = null;
String lineSeq = processLine(s, bulk);
if (lineSeq != null) {
lastSeq = lineSeq;
}
// spin a bit to see if we can get some more changes
try {
while ((s = stream.poll(bulkTimeout.millis(), TimeUnit.MILLISECONDS)) != null) {
lineSeq = processLine(s, bulk);
if (lineSeq != null) {
lastSeq = lineSeq;
}
if (bulk.numberOfActions() >= bulkSize) {
break;
}
}
} catch (InterruptedException e) {
if (closed) {
return;
}
}
if (lastSeq != null) {
try {
bulk.add(indexRequest(riverIndexName).type(riverName.name()).id("_seq").source(jsonBuilder().startObject().field("last_seq", lastSeq).endObject()));
} catch (IOException e) {
logger.warn("failed to add last_seq entry to bulk indexing");
}
}
try {
BulkResponse response = bulk.execute().actionGet();
if (response.hasFailures()) {
// TODO write to exception queue?
logger.warn("failed to execute" + response.buildFailureMessage());
}
} catch (Exception e) {
logger.warn("failed to execute bulk", e);
}
}
}
}
private class Slurper implements Runnable {
@Override public void run() {
while (true) {
if (closed) {
return;
}
String lastSeq = null;
try {
client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet();
GetResponse lastSeqGetResponse = client.prepareGet(riverIndexName, riverName().name(), "_seq").execute().actionGet();
if (lastSeqGetResponse.exists()) {
lastSeq = lastSeqGetResponse.sourceAsMap().get("last_seq").toString();
}
} catch (Exception e) {
logger.warn("failed to get last_seq, throttling....", e);
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
if (closed) {
return;
}
}
}
String file = "/" + couchDb + "/_changes?feed=continuous&include_docs=true&&heartbeat=10000";
if (couchFilter != null) {
file = file + "&filter=" + couchFilter;
}
if (lastSeq != null) {
file = file + "&since=" + lastSeq;
}
HttpURLConnection connection = null;
InputStream is = null;
try {
URL url = new URL("http", couchHost, couchPort, file);
connection = (HttpURLConnection) url.openConnection();
connection.setDoInput(true);
connection.setUseCaches(false);
is = connection.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line;
while ((line = reader.readLine()) != null) {
if (line.length() == 0) {
logger.trace("[couchdb] heartbeat");
continue;
}
if (logger.isTraceEnabled()) {
logger.trace("[couchdb] {}", line);
}
stream.add(line);
}
} catch (Exception e) {
if (is != null) {
try {
is.close();
} catch (IOException e1) {
// ignore
}
}
if (connection != null) {
try {
connection.disconnect();
} catch (Exception e1) {
// ignore
}
}
if (closed) {
return;
}
logger.warn("failed to read from _changes, throttling....", e);
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
if (closed) {
return;
}
}
}
}
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.couchdb;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.river.River;
/**
* @author kimchy (shay.banon)
*/
public class CouchdbRiverModule extends AbstractModule {
@Override protected void configure() {
bind(River.class).to(CouchdbRiver.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.couchdb;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
/**
* @author kimchy (shay.banon)
*/
public class CouchdbRiverTest {
public static void main(String[] args) throws Exception {
Node node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put("gateway.type", "none")).node();
node.client().prepareIndex("_river", "db", "_meta").setSource(jsonBuilder().startObject().field("type", "couchdb").endObject()).execute().actionGet();
Thread.sleep(1000000);
}
}

View File

@ -1,2 +1,2 @@
plugin=org.elasticsearch.plugin.river.rabbitmq.RiverRabbitMQPlugin
plugin=org.elasticsearch.plugin.river.rabbitmq.RabbitmqRiverPlugin

View File

@ -25,9 +25,9 @@ import org.elasticsearch.plugins.AbstractPlugin;
/**
* @author kimchy (shay.banon)
*/
public class RiverRabbitMQPlugin extends AbstractPlugin {
public class RabbitmqRiverPlugin extends AbstractPlugin {
@Inject public RiverRabbitMQPlugin() {
@Inject public RabbitmqRiverPlugin() {
}
@Override public String name() {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.river.AbstractRiverComponent;
@ -38,6 +39,7 @@ import org.elasticsearch.river.RiverSettings;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @author kimchy (shay.banon)
@ -46,6 +48,20 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
private final Client client;
private final String rabbitHost;
private final int rabbitPort;
private final String rabbitUser;
private final String rabbitPassword;
private final String rabbitVhost;
private final String rabbitQueue;
private final String rabbitExchange;
private final String rabbitRoutingKey;
private final int bulkSize;
private final TimeValue bulkTimeout;
private final boolean ordered;
private volatile boolean closed = false;
private volatile Thread thread;
@ -55,15 +71,54 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
@Inject public RabbitmqRiver(RiverName riverName, RiverSettings settings, Client client) {
super(riverName, settings);
this.client = client;
if (settings.settings().containsKey("rabbitmq")) {
Map<String, Object> rabbitSettings = (Map<String, Object>) settings.settings().get("rabbitmq");
rabbitHost = XContentMapValues.nodeStringValue(rabbitSettings.get("host"), ConnectionFactory.DEFAULT_HOST);
rabbitPort = XContentMapValues.nodeIntegerValue(rabbitSettings.get("port"), ConnectionFactory.DEFAULT_AMQP_PORT);
rabbitUser = XContentMapValues.nodeStringValue(rabbitSettings.get("host"), ConnectionFactory.DEFAULT_USER);
rabbitPassword = XContentMapValues.nodeStringValue(rabbitSettings.get("host"), ConnectionFactory.DEFAULT_PASS);
rabbitVhost = XContentMapValues.nodeStringValue(rabbitSettings.get("host"), ConnectionFactory.DEFAULT_VHOST);
rabbitQueue = XContentMapValues.nodeStringValue(rabbitSettings.get("queue"), "elasticsearch");
rabbitExchange = XContentMapValues.nodeStringValue(rabbitSettings.get("exchange"), "elasticsearch");
rabbitRoutingKey = XContentMapValues.nodeStringValue(rabbitSettings.get("routing_key"), "elasticsearch");
} else {
rabbitHost = ConnectionFactory.DEFAULT_HOST;
rabbitPort = ConnectionFactory.DEFAULT_AMQP_PORT;
rabbitUser = ConnectionFactory.DEFAULT_USER;
rabbitPassword = ConnectionFactory.DEFAULT_PASS;
rabbitVhost = ConnectionFactory.DEFAULT_VHOST;
rabbitQueue = "elasticsearch";
rabbitExchange = "elasticsearch";
rabbitRoutingKey = "elasticsearch";
}
if (settings.settings().containsKey("index")) {
Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_size"), 100);
if (indexSettings.containsKey("bulk_timeout")) {
bulkTimeout = TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(indexSettings.get("bulk_timeout"), "10ms"), TimeValue.timeValueMillis(10));
} else {
bulkTimeout = TimeValue.timeValueMillis(10);
}
ordered = XContentMapValues.nodeBooleanValue(indexSettings.get("ordered"), false);
} else {
bulkSize = 100;
bulkTimeout = TimeValue.timeValueMillis(10);
ordered = false;
}
}
@Override public void start() {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(XContentMapValues.nodeStringValue(settings.settings().get("host"), ConnectionFactory.DEFAULT_HOST));
connectionFactory.setPort(XContentMapValues.nodeIntegerValue(settings.settings().get("port"), ConnectionFactory.DEFAULT_AMQP_PORT));
connectionFactory.setUsername(XContentMapValues.nodeStringValue(settings.settings().get("user"), ConnectionFactory.DEFAULT_USER));
connectionFactory.setPassword(XContentMapValues.nodeStringValue(settings.settings().get("password"), ConnectionFactory.DEFAULT_PASS));
connectionFactory.setVirtualHost(XContentMapValues.nodeStringValue(settings.settings().get("vhost"), ConnectionFactory.DEFAULT_VHOST));
connectionFactory.setHost(rabbitHost);
connectionFactory.setPort(rabbitPort);
connectionFactory.setUsername(rabbitUser);
connectionFactory.setPassword(rabbitPassword);
connectionFactory.setVirtualHost(rabbitVhost);
logger.info("creating rabbitmq river, host [{}], port [{}], user [{}], vhost [{}]", connectionFactory.getHost(), connectionFactory.getPort(), connectionFactory.getUsername(), connectionFactory.getVirtualHost());
@ -108,22 +163,16 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
}
}
int bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100);
long bulkTimeout = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_timeout"), 10);
String queue = XContentMapValues.nodeStringValue(settings.settings().get("queue"), "elasticsearch");
String exchange = XContentMapValues.nodeStringValue(settings.settings().get("exchange"), "elasticsearch");
String routingKey = XContentMapValues.nodeStringValue(settings.settings().get("routing_key"), "elasticsearch");
QueueingConsumer consumer = new QueueingConsumer(channel);
// define the queue
try {
channel.exchangeDeclare(exchange/*exchange*/, "direct"/*type*/, true /*durable*/);
channel.queueDeclare(queue/*queue*/, true /*durable*/, false/*exclusive*/, false/*autoDelete*/, null);
channel.queueBind(queue/*queue*/, exchange/*exchange*/, routingKey/*routingKey*/);
channel.basicConsume(queue/*queue*/, false/*noAck*/, consumer);
channel.exchangeDeclare(rabbitExchange/*exchange*/, "direct"/*type*/, true /*durable*/);
channel.queueDeclare(rabbitQueue/*queue*/, true /*durable*/, false/*exclusive*/, false/*autoDelete*/, null);
channel.queueBind(rabbitQueue/*queue*/, rabbitExchange/*exchange*/, rabbitRoutingKey/*routingKey*/);
channel.basicConsume(rabbitQueue/*queue*/, false/*noAck*/, consumer);
} catch (Exception e) {
if (!closed) {
logger.warn("failed to create queue [{}]", e, queue);
logger.warn("failed to create queue [{}]", e, rabbitQueue);
}
cleanup(0, "failed to create queue");
continue;
@ -167,7 +216,7 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
if (bulkRequestBuilder.numberOfActions() < bulkSize) {
// try and spin some more of those without timeout, so we have a bigger bulk (bounded by the bulk size)
try {
while ((task = consumer.nextDelivery(bulkTimeout)) != null) {
while ((task = consumer.nextDelivery(bulkTimeout.millis())) != null) {
try {
bulkRequestBuilder.add(task.getBody(), 0, task.getBody().length, false);
} catch (Exception e) {
@ -194,6 +243,24 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
logger.trace("executing bulk with [{}] actions", bulkRequestBuilder.numberOfActions());
}
if (ordered) {
try {
BulkResponse response = bulkRequestBuilder.execute().actionGet();
if (response.hasFailures()) {
// TODO write to exception queue?
logger.warn("failed to execute" + response.buildFailureMessage());
}
for (Long deliveryTag : deliveryTags) {
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e1) {
logger.warn("failed to ack [{}]", e1, deliveryTag);
}
}
} catch (Exception e) {
logger.warn("failed to execute bulk", e);
}
} else {
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override public void onResponse(BulkResponse response) {
if (response.hasFailures()) {
@ -217,6 +284,7 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
}
}
}
}
private void cleanup(int code, String message) {
try {

View File

@ -1,2 +1,2 @@
plugin=org.elasticsearch.plugin.river.twitter.RiverTwitterPlugin
plugin=org.elasticsearch.plugin.river.twitter.TwitterRiverPlugin

View File

@ -25,9 +25,9 @@ import org.elasticsearch.plugins.AbstractPlugin;
/**
* @author kimchy (shay.banon)
*/
public class RiverTwitterPlugin extends AbstractPlugin {
public class TwitterRiverPlugin extends AbstractPlugin {
@Inject public RiverTwitterPlugin() {
@Inject public TwitterRiverPlugin() {
}
@Override public String name() {

View File

@ -67,18 +67,22 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
super(riverName, settings);
this.client = client;
String user = XContentMapValues.nodeStringValue(settings.settings().get("user"), null);
String password = XContentMapValues.nodeStringValue(settings.settings().get("password"), null);
String user = null;
String password = null;
if (settings.settings().containsKey("twitter")) {
Map<String, Object> twitterSettings = (Map<String, Object>) settings.settings().get("twitter");
user = XContentMapValues.nodeStringValue(twitterSettings.get("user"), null);
password = XContentMapValues.nodeStringValue(twitterSettings.get("password"), null);
}
logger.info("creating twitter stream river for [{}]", user);
this.bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100);
this.dropThreshold = XContentMapValues.nodeIntegerValue(settings.settings().get("drop_threshold"), 10);
if (user == null || password == null) {
stream = null;
indexName = null;
typeName = null;
bulkSize = 100;
dropThreshold = 10;
logger.warn("no user / password specified, disabling river...");
return;
}
@ -87,9 +91,13 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name());
typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "status");
this.bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100);
this.dropThreshold = XContentMapValues.nodeIntegerValue(settings.settings().get("drop_threshold"), 10);
} else {
indexName = riverName.name();
typeName = "status";
bulkSize = 100;
dropThreshold = 10;
}
stream = new TwitterStreamFactory(new StatusHandler()).getInstance(user, password);

View File

@ -19,6 +19,7 @@ include 'plugins-transport-thrift'
include 'plugins-river-twitter'
include 'plugins-river-rabbitmq'
include 'plugins-river-couchdb'
rootProject.name = 'elasticsearch-root'
rootProject.children.each {project ->