rabbitmq river

This commit is contained in:
kimchy 2010-09-21 17:32:34 +02:00
parent 32acd88b2e
commit aa68667d63
10 changed files with 304 additions and 4 deletions

View File

@ -102,6 +102,7 @@
<w>proc</w>
<w>publishhost</w>
<w>queryparser</w>
<w>rabbitmq</w>
<w>rackspace</w>
<w>rebalance</w>
<w>rebalancing</w>

View File

@ -9,6 +9,7 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-river-rabbitmq.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-river-rabbitmq.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-river-twitter.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" />

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/river/rabbitmq/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/river/rabbitmq/build/classes/test" />
<exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/river/rabbitmq">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/river/rabbitmq/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../plugins/river/rabbitmq/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/../../plugins/river/rabbitmq/build" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="module-library">
<library name="rabbitmq">
<CLASSES>
<root url="jar://$GRADLE_REPOSITORY$/com.rabbitmq/amqp-client/jars/amqp-client-2.1.0.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/commons-io/commons-io/jars/commons-io-1.2.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="file://$MODULE_DIR$/../../../../../opt/rabbitmq/java-client-2.1.0/src" />
</SOURCES>
</library>
</orderEntry>
<orderEntry type="module" module-name="test-testng" scope="TEST" />
<orderEntry type="library" scope="TEST" name="hamcrest" level="project" />
<orderEntry type="library" scope="TEST" name="testng" level="project" />
</component>
</module>

View File

@ -57,6 +57,20 @@ public class BulkResponse implements ActionResponse, Iterable<BulkItemResponse>
return false;
}
public String buildFailureMessage() {
StringBuilder sb = new StringBuilder();
sb.append("failure in bulk execution:");
for (int i = 0; i < responses.length; i++) {
BulkItemResponse response = responses[i];
if (response.failed()) {
sb.append("\n[").append(i)
.append("]: index [").append(response.index()).append("], type [").append(response.type()).append("], id [").append(response.id())
.append("], message [").append(response.failureMessage()).append("]");
}
}
return sb.toString();
}
/**
* The items representing each action performed in the bulk operation (in the same order!).
*/

View File

@ -0,0 +1,142 @@
dependsOn(':elasticsearch')
apply plugin: 'java'
apply plugin: 'maven'
archivesBaseName = "elasticsearch-river-rabbitmq"
explodedDistDir = new File(distsDir, 'exploded')
manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::River::RabbitMQ", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
configurations.compile.transitive = true
configurations.testCompile.transitive = true
// no need to use the resource dir
sourceSets.main.resources.srcDirs 'src/main/java'
sourceSets.test.resources.srcDirs 'src/test/java'
// add the source files to the dist jar
//jar {
// from sourceSets.main.allJava
//}
configurations {
dists
distLib {
visible = false
transitive = false
}
}
dependencies {
compile project(':elasticsearch')
compile('com.rabbitmq:amqp-client:2.1.0') { transitive = false }
compile("commons-io:commons-io:1.2") { transitive = false }
distLib('com.rabbitmq:amqp-client:2.1.0') { transitive = false }
distLib("commons-io:commons-io:1.2") { transitive = false }
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
testCompile 'org.hamcrest:hamcrest-all:1.1'
}
test {
useTestNG()
jmvArgs = ["-ea", "-Xmx1024m"]
suiteName = project.name
listeners = ["org.elasticsearch.util.testng.Listeners"]
systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties")
}
task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << {
[explodedDistDir]*.mkdirs()
copy {
from configurations.distLib
into explodedDistDir
}
// remove elasticsearch files (compile above adds the elasticsearch one)
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") }
copy {
from libsDir
into explodedDistDir
}
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") }
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") }
}
task zip(type: Zip, dependsOn: ['explodedDist']) {
from(explodedDistDir) {
}
}
task release(dependsOn: [zip]) << {
ant.delete(dir: explodedDistDir)
copy {
from distsDir
into(new File(rootProject.distsDir, "plugins"))
}
}
configurations {
deployerJars
}
dependencies {
deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2"
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}
artifacts {
archives sourcesJar
archives javadocJar
}
uploadArchives {
repositories.mavenDeployer {
configuration = configurations.deployerJars
repository(url: rootProject.mavenRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
pom.project {
inceptionYear '2009'
name 'elasticsearch-plugins-river-rabbitmq'
description 'Attachments Plugin for ElasticSearch'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
scm {
connection 'git://github.com/elasticsearch/elasticsearch.git'
developerConnection 'git@github.com:elasticsearch/elasticsearch.git'
url 'http://github.com/elasticsearch/elasticsearch'
}
}
pom.whenConfigured {pom ->
pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones
}
}
}

View File

@ -0,0 +1,2 @@
plugin=org.elasticsearch.plugin.river.rabbitmq.RiverRabbitMQPlugin

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.river.rabbitmq;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugins.AbstractPlugin;
/**
* @author kimchy (shay.banon)
*/
public class RiverRabbitMQPlugin extends AbstractPlugin {
@Inject public RiverRabbitMQPlugin() {
}
@Override public String name() {
return "river-rabbitmq";
}
@Override public String description() {
return "River RabbitMQ Plugin";
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
/**
* @author kimchy (shay.banon)
*/
public class RabbitMQRiverTest {
public static void main(String[] args) throws Exception {
Node node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put("gateway.type", "none")).node();
node.client().prepareIndex("_river", "test1", "_meta").setSource(jsonBuilder().startObject().field("type", "rabbitmq").endObject()).execute().actionGet();
ConnectionFactory cfconn = new ConnectionFactory();
cfconn.setHost("localhost");
cfconn.setPort(AMQP.PROTOCOL.PORT);
Connection conn = cfconn.newConnection();
Channel ch = conn.createChannel();
ch.exchangeDeclare("elasticsearch", "direct", true);
ch.queueDeclare("elasticsearch", true, false, false, null);
String message = "{ \"index\" : { \"index\" : \"test\", \"type\" : \"type1\", \"id\" : \"1\" }\n" +
"{ \"type1\" : { \"field1\" : \"value1\" } }\n" +
"{ \"delete\" : { \"index\" : \"test\", \"type\" : \"type1\", \"id\" : \"2\" } }\n" +
"{ \"create\" : { \"index\" : \"test\", \"type\" : \"type1\", \"id\" : \"1\" }\n" +
"{ \"type1\" : { \"field1\" : \"value1\" } }";
ch.basicPublish("elasticsearch", "elasticsearch", null, message.getBytes());
ch.close();
conn.close();
Thread.sleep(100000);
}
}

View File

@ -70,7 +70,7 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
String user = XContentMapValues.nodeStringValue(settings.settings().get("user"), null);
String password = XContentMapValues.nodeStringValue(settings.settings().get("password"), null);
logger.info("creating twitter stream indexer for [{}]", user);
logger.info("creating twitter stream river for [{}]", user);
this.bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100);
this.dropThreshold = XContentMapValues.nodeIntegerValue(settings.settings().get("drop_threshold"), 10);
@ -79,7 +79,7 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
stream = null;
indexName = null;
typeName = null;
logger.warn("no user / password specified, disabling indexer...");
logger.warn("no user / password specified, disabling river...");
return;
}
@ -106,7 +106,7 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
// ok, not recovered yet..., lets start indexing and hope we recover by the first bulk
// TODO: a smarter logic can be to register for cluster event listener here, and only start sampling when the block is removed...
} else {
logger.warn("failed to create index [{}], disabling indexer...", e, indexName);
logger.warn("failed to create index [{}], disabling river...", e, indexName);
return;
}
}
@ -115,7 +115,7 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}
@Override public void close() {
logger.info("closing twitter stream indexer");
logger.info("closing twitter stream river");
if (stream != null) {
stream.cleanup();
stream.shutdown();

View File

@ -13,9 +13,12 @@ include 'plugins-hadoop'
include 'plugins-analysis-icu'
include 'plugins-mapper-attachments'
include 'plugins-client-groovy'
include 'plugins-transport-memcached'
include 'plugins-transport-thrift'
include 'plugins-river-twitter'
include 'plugins-river-rabbitmq'
rootProject.name = 'elasticsearch-root'
rootProject.children.each {project ->