add a twitter indexer

This commit is contained in:
kimchy 2010-09-20 18:17:02 +02:00
parent ed96d161a6
commit 94a77b69d6
19 changed files with 568 additions and 8 deletions

View File

@ -8,6 +8,7 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-indexer-twitter.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-indexer-twitter.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-thrift.iml" />

View File

@ -20,6 +20,7 @@
<orderEntry type="module" module-name="plugin-analysis-icu" /> <orderEntry type="module" module-name="plugin-analysis-icu" />
<orderEntry type="module" module-name="plugins-hadoop" /> <orderEntry type="module" module-name="plugins-hadoop" />
<orderEntry type="module" module-name="plugin-cloud-aws" /> <orderEntry type="module" module-name="plugin-cloud-aws" />
<orderEntry type="module" module-name="plugin-indexer-twitter" />
<orderEntry type="module" module-name="test-integration" /> <orderEntry type="module" module-name="test-integration" />
</component> </component>
</module> </module>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/indexer/twitter/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/indexer/twitter/build/classes/test" />
<exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/indexer/twitter">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/indexer/twitter/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../plugins/indexer/twitter/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/../../plugins/indexer/twitter/build" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="module-library">
<library name="twitter4j">
<CLASSES>
<root url="jar://$GRADLE_REPOSITORY$/org.twitter4j/twitter4j-core/jars/twitter4j-core-2.1.4.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
<orderEntry type="module" module-name="test-testng" scope="TEST" />
<orderEntry type="library" scope="TEST" name="testng" level="project" />
<orderEntry type="library" scope="TEST" name="hamcrest" level="project" />
</component>
</module>

View File

@ -66,6 +66,10 @@ public class BulkRequest implements ActionRequest {
return this; return this;
} }
public int numberOfActions() {
return requests.size();
}
@Override public ActionRequestValidationException validate() { @Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null; ActionRequestValidationException validationException = null;
if (requests.isEmpty()) { if (requests.isEmpty()) {

View File

@ -75,6 +75,13 @@ public class BulkRequestBuilder extends BaseRequestBuilder<BulkRequest, BulkResp
return this; return this;
} }
/**
* The number of actions currently in the bulk.
*/
public int numberOfActions() {
return request.numberOfActions();
}
@Override protected void doExecute(ActionListener<BulkResponse> listener) { @Override protected void doExecute(ActionListener<BulkResponse> listener) {
client.bulk(request, listener); client.bulk(request, listener);
} }

View File

@ -47,6 +47,7 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indexer.IndexerIndexName;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
@ -128,7 +129,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain '#")); listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain '#"));
return currentState; return currentState;
} }
if (request.index.charAt(0) == '_') { if (!request.index.equals(IndexerIndexName.Conf.DEFAULT_INDEXER_NAME) && request.index.charAt(0) == '_') {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not start with '_'")); listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not start with '_'"));
return currentState; return currentState;
} }

View File

@ -63,6 +63,16 @@ public class XContentMapValues {
return Integer.parseInt(node.toString()); return Integer.parseInt(node.toString());
} }
public static int nodeIntegerValue(Object node, int defaultValue) {
if (node == null) {
return defaultValue;
}
if (node instanceof Number) {
return ((Number) node).intValue();
}
return Integer.parseInt(node.toString());
}
public static short nodeShortValue(Object node) { public static short nodeShortValue(Object node) {
if (node instanceof Number) { if (node instanceof Number) {
return ((Number) node).shortValue(); return ((Number) node).shortValue();

View File

@ -24,5 +24,7 @@ package org.elasticsearch.indexer;
*/ */
public interface Indexer extends IndexerComponent { public interface Indexer extends IndexerComponent {
void start();
void close(); void close();
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indexer; package org.elasticsearch.indexer;
import org.elasticsearch.common.inject.BindingAnnotation; import org.elasticsearch.common.inject.BindingAnnotation;
import org.elasticsearch.common.settings.Settings;
import java.lang.annotation.Documented; import java.lang.annotation.Documented;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
@ -37,4 +38,12 @@ import static java.lang.annotation.RetentionPolicy.*;
@Retention(RUNTIME) @Retention(RUNTIME)
@Documented @Documented
public @interface IndexerIndexName { public @interface IndexerIndexName {
static class Conf {
public static String DEFAULT_INDEXER_NAME = "_indexer";
public static String indexName(Settings settings) {
return settings.get("indexer.index_name", DEFAULT_INDEXER_NAME);
}
}
} }

View File

@ -36,7 +36,7 @@ public class IndexersModule extends AbstractModule {
} }
@Override protected void configure() { @Override protected void configure() {
bind(String.class).annotatedWith(IndexerIndexName.class).toInstance(settings.get("indexer.index_name", "indexer")); bind(String.class).annotatedWith(IndexerIndexName.class).toInstance(IndexerIndexName.Conf.indexName(settings));
bind(IndexersService.class).asEagerSingleton(); bind(IndexersService.class).asEagerSingleton();
bind(IndexerClusterService.class).asEagerSingleton(); bind(IndexerClusterService.class).asEagerSingleton();
bind(IndexersRouter.class).asEagerSingleton(); bind(IndexersRouter.class).asEagerSingleton();

View File

@ -66,7 +66,7 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
@Inject public IndexersService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) { @Inject public IndexersService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) {
super(settings); super(settings);
this.indexerIndexName = settings.get("indexer.index_name", "indexer"); this.indexerIndexName = IndexerIndexName.Conf.indexName(settings);
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
@ -118,6 +118,11 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
indexersInjectors.put(indexerName, indexInjector); indexersInjectors.put(indexerName, indexInjector);
Indexer indexer = indexInjector.getInstance(Indexer.class); Indexer indexer = indexInjector.getInstance(Indexer.class);
indexers = MapBuilder.newMapBuilder(indexers).put(indexerName, indexer).immutableMap(); indexers = MapBuilder.newMapBuilder(indexers).put(indexerName, indexer).immutableMap();
// we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors
indexer.start();
return indexer; return indexer;
} }
@ -168,16 +173,22 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
if (!routing.node().equals(localNode)) { if (!routing.node().equals(localNode)) {
continue; continue;
} }
// if its already created, ignore it
if (indexers.containsKey(routing.indexerName())) {
continue;
}
client.prepareGet(indexerIndexName, routing.indexerName().name(), "_meta").execute(new ActionListener<GetResponse>() { client.prepareGet(indexerIndexName, routing.indexerName().name(), "_meta").execute(new ActionListener<GetResponse>() {
@Override public void onResponse(GetResponse getResponse) { @Override public void onResponse(GetResponse getResponse) {
if (getResponse.exists()) { if (!indexers.containsKey(routing.indexerName())) {
// only create the indexer if it exists, otherwise, the indexing meta data has not been visible yet... if (getResponse.exists()) {
createIndexer(routing.indexerName(), getResponse.sourceAsMap()); // only create the indexer if it exists, otherwise, the indexing meta data has not been visible yet...
createIndexer(routing.indexerName(), getResponse.sourceAsMap());
}
} }
} }
@Override public void onFailure(Throwable e) { @Override public void onFailure(Throwable e) {
logger.warn("failed to get _meta from [{}]/[{}]", routing.indexerName().type(), routing.indexerName().name()); logger.warn("failed to get _meta from [{}]/[{}]", e, routing.indexerName().type(), routing.indexerName().name());
} }
}); });
} }

View File

@ -35,6 +35,10 @@ public class DummyIndexer extends AbstractIndexerComponent implements Indexer {
logger.info("create"); logger.info("create");
} }
@Override public void start() {
logger.info("start");
}
@Override public void close() { @Override public void close() {
logger.info("close"); logger.info("close");
} }

View File

@ -35,11 +35,13 @@ import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indexer.IndexerIndexName;
import org.elasticsearch.indexer.IndexerName; import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.cluster.IndexerClusterService; import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState; import org.elasticsearch.indexer.cluster.IndexerClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask; import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask;
import org.elasticsearch.indexer.cluster.IndexerNodeHelper; import org.elasticsearch.indexer.cluster.IndexerNodeHelper;
import org.elasticsearch.indices.IndexMissingException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -58,7 +60,7 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
@Inject public IndexersRouter(Settings settings, Client client, ClusterService clusterService, IndexerClusterService indexerClusterService) { @Inject public IndexersRouter(Settings settings, Client client, ClusterService clusterService, IndexerClusterService indexerClusterService) {
super(settings); super(settings);
this.indexerIndexName = settings.get("indexer.index_name", "indexer"); this.indexerIndexName = IndexerIndexName.Conf.indexName(settings);
this.indexerClusterService = indexerClusterService; this.indexerClusterService = indexerClusterService;
this.client = client; this.client = client;
clusterService.add(this); clusterService.add(this);
@ -111,6 +113,8 @@ public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> i
} }
} catch (ClusterBlockException e) { } catch (ClusterBlockException e) {
// ignore, we will get it next time // ignore, we will get it next time
} catch (IndexMissingException e) {
// ignore, we will get it next time
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to get/parse _meta for [{}]", e, mappingType); logger.warn("failed to get/parse _meta for [{}]", e, mappingType);
} }

View File

@ -0,0 +1,140 @@
dependsOn(':elasticsearch')
apply plugin: 'java'
apply plugin: 'maven'
archivesBaseName = "elasticsearch-indexer-twitter"
explodedDistDir = new File(distsDir, 'exploded')
manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::Indexer::Twitter", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
configurations.compile.transitive = true
configurations.testCompile.transitive = true
// no need to use the resource dir
sourceSets.main.resources.srcDirs 'src/main/java'
sourceSets.test.resources.srcDirs 'src/test/java'
// add the source files to the dist jar
//jar {
// from sourceSets.main.allJava
//}
configurations {
dists
distLib {
visible = false
transitive = false
}
}
dependencies {
compile project(':elasticsearch')
compile('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
distLib('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
testCompile 'org.hamcrest:hamcrest-all:1.1'
}
test {
useTestNG()
jmvArgs = ["-ea", "-Xmx1024m"]
suiteName = project.name
listeners = ["org.elasticsearch.util.testng.Listeners"]
systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties")
}
task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << {
[explodedDistDir]*.mkdirs()
copy {
from configurations.distLib
into explodedDistDir
}
// remove elasticsearch files (compile above adds the elasticsearch one)
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") }
copy {
from libsDir
into explodedDistDir
}
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") }
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") }
}
task zip(type: Zip, dependsOn: ['explodedDist']) {
from(explodedDistDir) {
}
}
task release(dependsOn: [zip]) << {
ant.delete(dir: explodedDistDir)
copy {
from distsDir
into(new File(rootProject.distsDir, "plugins"))
}
}
configurations {
deployerJars
}
dependencies {
deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2"
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}
artifacts {
archives sourcesJar
archives javadocJar
}
uploadArchives {
repositories.mavenDeployer {
configuration = configurations.deployerJars
repository(url: rootProject.mavenRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
pom.project {
inceptionYear '2009'
name 'elasticsearch-plugins-indexer-twitter'
description 'Attachments Plugin for ElasticSearch'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
scm {
connection 'git://github.com/elasticsearch/elasticsearch.git'
developerConnection 'git@github.com:elasticsearch/elasticsearch.git'
url 'http://github.com/elasticsearch/elasticsearch'
}
}
pom.whenConfigured {pom ->
pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones
}
}
}

View File

@ -0,0 +1,2 @@
plugin=org.elasticsearch.plugin.indexer.twitter.IndexerTwitterPlugin

View File

@ -0,0 +1,261 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.twitter;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indexer.AbstractIndexerComponent;
import org.elasticsearch.indexer.Indexer;
import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.IndexerSettings;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import twitter4j.*;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kimchy (shay.banon)
*/
public class TwitterIndexer extends AbstractIndexerComponent implements Indexer {
private final Client client;
private final String indexName;
private final String typeName;
private final int bulkSize;
private final int dropThreshold;
private final TwitterStream stream;
private final AtomicInteger onGoingBulks = new AtomicInteger();
private volatile BulkRequestBuilder currentRequest;
@Inject public TwitterIndexer(IndexerName indexerName, IndexerSettings settings, Client client) {
super(indexerName, settings);
this.client = client;
String user = XContentMapValues.nodeStringValue(settings.settings().get("user"), null);
String password = XContentMapValues.nodeStringValue(settings.settings().get("password"), null);
logger.info("creating twitter stream indexer for [{}]", user);
this.bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), 100);
this.dropThreshold = XContentMapValues.nodeIntegerValue(settings.settings().get("drop_threshold"), 10);
if (user == null || password == null) {
stream = null;
indexName = null;
typeName = null;
logger.warn("no user / password specified, disabling indexer...");
return;
}
if (settings.settings().containsKey("index")) {
Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), indexerName.name());
typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), indexerName.name());
} else {
indexName = indexerName.name();
typeName = "status";
}
stream = new TwitterStreamFactory(new StatusHandler()).getInstance(user, password);
}
@Override public void start() {
logger.info("starting twitter stream");
try {
client.admin().indices().prepareCreate(indexName).execute().actionGet();
currentRequest = client.prepareBulk();
stream.sample();
} catch (Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// that's fine
} else {
logger.warn("failed to create index [{}], disabling indexer...", e, indexName);
}
}
}
@Override public void close() {
logger.info("closing twitter stream indexer");
if (stream != null) {
stream.cleanup();
stream.shutdown();
}
}
private class StatusHandler extends StatusAdapter {
@Override public void onStatus(Status status) {
if (logger.isTraceEnabled()) {
logger.trace("status {} : {}", status.getUser().getName(), status.getText());
}
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("text", status.getText());
builder.field("created_at", status.getCreatedAt());
builder.field("source", status.getSource());
builder.field("truncated", status.isTruncated());
if (status.getUserMentions() != null) {
builder.startArray("mention");
for (User user : status.getUserMentions()) {
builder.startObject();
builder.field("id", user.getId());
builder.field("name", user.getName());
builder.field("screen_name", user.getScreenName());
builder.endObject();
}
builder.endArray();
}
if (status.getRetweetCount() != -1) {
builder.field("retweet_count", status.getRetweetCount());
}
if (status.getInReplyToStatusId() != -1) {
builder.startObject("in_reply");
builder.field("status", status.getInReplyToStatusId());
if (status.getInReplyToUserId() != -1) {
builder.field("user_id", status.getInReplyToUserId());
builder.field("user_screen_name", status.getInReplyToScreenName());
}
builder.endObject();
}
if (status.getHashtags() != null) {
builder.array("hashtag", status.getHashtags());
}
if (status.getContributors() != null) {
builder.array("contributor", status.getContributors());
}
if (status.getGeoLocation() != null) {
builder.startObject("location");
builder.field("lat", status.getGeoLocation().getLatitude());
builder.field("lon", status.getGeoLocation().getLongitude());
builder.endObject();
}
if (status.getPlace() != null) {
builder.startObject("place");
builder.field("id", status.getPlace().getId());
builder.field("name", status.getPlace().getName());
builder.field("type", status.getPlace().getPlaceType());
builder.field("full_name", status.getPlace().getFullName());
builder.field("street_address", status.getPlace().getStreetAddress());
builder.field("country", status.getPlace().getCountry());
builder.field("country_code", status.getPlace().getCountryCode());
builder.field("url", status.getPlace().getURL());
builder.endObject();
}
if (status.getURLs() != null) {
builder.startArray("link");
for (URL url : status.getURLs()) {
if (url != null) {
builder.value(url.toExternalForm());
}
}
builder.endArray();
}
if (status.getAnnotations() != null) {
builder.startObject("annotation");
for (Annotation ann : status.getAnnotations().getAnnotations()) {
builder.startObject(ann.getType());
for (Map.Entry<String, String> entry : ann.getAttributes().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
}
builder.endObject();
}
builder.startObject("user");
builder.field("id", status.getUser().getId());
builder.field("name", status.getUser().getName());
builder.field("screen_name", status.getUser().getScreenName());
builder.field("location", status.getUser().getLocation());
builder.field("description", status.getUser().getDescription());
builder.endObject();
builder.endObject();
currentRequest.add(Requests.indexRequest(indexName).type(typeName).id(Long.toString(status.getId())).create(true).source(builder));
processBulkIfNeeded();
} catch (Exception e) {
logger.warn("failed to construct index request", e);
}
}
@Override public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
if (statusDeletionNotice.getStatusId() != -1) {
currentRequest.add(Requests.deleteRequest(indexName).type(typeName).id(Long.toString(statusDeletionNotice.getStatusId())));
processBulkIfNeeded();
}
}
@Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
}
@Override public void onException(Exception ex) {
logger.warn("stream failure", ex);
}
private void processBulkIfNeeded() {
if (currentRequest.numberOfActions() >= bulkSize) {
// execute the bulk operation
int currentOnGoingBulks = onGoingBulks.incrementAndGet();
if (currentOnGoingBulks > dropThreshold) {
onGoingBulks.decrementAndGet();
logger.warn("dropping bulk, [{}] crossed threshold [{}]", onGoingBulks, dropThreshold);
} else {
try {
currentRequest.execute(new ActionListener<BulkResponse>() {
@Override public void onResponse(BulkResponse bulkResponse) {
onGoingBulks.decrementAndGet();
}
@Override public void onFailure(Throwable e) {
logger.warn("failed to execute bulk");
}
});
} catch (Exception e) {
logger.warn("failed to process bulk", e);
}
}
currentRequest = client.prepareBulk();
}
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.twitter;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.indexer.Indexer;
/**
* @author kimchy (shay.banon)
*/
public class TwitterIndexerModule extends AbstractModule {
@Override protected void configure() {
bind(Indexer.class).to(TwitterIndexer.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.indexer.twitter;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugins.AbstractPlugin;
/**
* @author kimchy (shay.banon)
*/
public class IndexerTwitterPlugin extends AbstractPlugin {
@Inject public IndexerTwitterPlugin() {
}
@Override public String name() {
return "indexer-twitter";
}
@Override public String description() {
return "Indexer Twitter Plugin";
}
}

View File

@ -15,6 +15,7 @@ include 'plugins-mapper-attachments'
include 'plugins-client-groovy' include 'plugins-client-groovy'
include 'plugins-transport-memcached' include 'plugins-transport-memcached'
include 'plugins-transport-thrift' include 'plugins-transport-thrift'
include 'plugins-indexer-twitter'
rootProject.name = 'elasticsearch-root' rootProject.name = 'elasticsearch-root'
rootProject.children.each {project -> rootProject.children.each {project ->