memcached initial work

This commit is contained in:
kimchy 2010-04-16 22:01:16 +03:00
parent ac5285ea24
commit 51aac0cdf7
46 changed files with 1721 additions and 30 deletions

View File

@ -34,10 +34,12 @@
<w>lifecycle</w>
<w>linefeeds</w>
<w>lucene</w>
<w>memcached</w>
<w>metadata</w>
<w>millis</w>
<w>mmap</w>
<w>multi</w>
<w>multiline</w>
<w>nanos</w>
<w>newcount</w>
<w>ngram</w>

View File

@ -0,0 +1,9 @@
<component name="libraryTable">
<library name="spymemcached">
<CLASSES>
<root url="jar://$GRADLE_REPOSITORY$/spy/memcached/jars/memcached-2.4.2.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</component>

View File

@ -7,6 +7,7 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules/elasticsearch-root.iml" filepath="$PROJECT_DIR$/.idea/modules/elasticsearch-root.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-attachments.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-groovy.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-memcached.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//test-integration.iml" filepath="$PROJECT_DIR$/.idea/modules//test-integration.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//test-testng.iml" filepath="$PROJECT_DIR$/.idea/modules//test-testng.iml" />
</modules>

View File

@ -13,6 +13,7 @@
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="module" module-name="plugin-memcached" />
<orderEntry type="module" module-name="plugin-attachments" />
<orderEntry type="module" module-name="test-integration" />
</component>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/memcached/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/memcached/build/classes/test" />
<exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/memcached">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/memcached/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../plugins/memcached/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/../../plugins/memcached/build" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="library" scope="TEST" name="testng" level="project" />
<orderEntry type="library" scope="TEST" name="hamcrest" level="project" />
<orderEntry type="module" module-name="test-testng" scope="TEST" />
<orderEntry type="library" scope="TEST" name="spymemcached" level="project" />
</component>
</module>

View File

@ -109,7 +109,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
if (System.getProperty("jgroups.bind_addr") == null) {
// automatically set the bind address based on ElasticSearch default bindings...
try {
InetAddress bindAddress = HostResolver.resultBindHostAddress(null, settings, HostResolver.LOCAL_IP);
InetAddress bindAddress = HostResolver.resolveBindHostAddress(null, settings, HostResolver.LOCAL_IP);
if ((bindAddress instanceof Inet4Address && HostResolver.isIPv4()) || (bindAddress instanceof Inet6Address && !HostResolver.isIPv4())) {
sysPropsSet.put("jgroups.bind_addr", bindAddress.getHostAddress());
System.setProperty("jgroups.bind_addr", bindAddress.getHostAddress());

View File

@ -39,9 +39,9 @@ public class NettyHttpRequest extends AbstractRestRequest implements HttpRequest
private final org.jboss.netty.handler.codec.http.HttpRequest request;
private Map<String, String> params;
private final Map<String, String> params;
private String path;
private final String path;
public NettyHttpRequest(org.jboss.netty.handler.codec.http.HttpRequest request) {
this.request = request;

View File

@ -163,7 +163,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
// Bind and start to accept incoming connections.
InetAddress hostAddressX;
try {
hostAddressX = resultBindHostAddress(bindHost, settings);
hostAddressX = resolveBindHostAddress(bindHost, settings);
} catch (IOException e) {
throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e);
}
@ -189,11 +189,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
try {
InetAddress publishAddressX = resultPublishHostAddress(publishHost, settings);
InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
if (publishAddressX == null) {
// if its 0.0.0.0, we can't publish that.., default to the local ip address
if (boundAddress.getAddress().isAnyLocalAddress()) {
publishAddress = new InetSocketAddress(resultPublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
} else {
publishAddress = boundAddress;
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet;
import org.jboss.netty.channel.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
@ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
public class OpenChannelsHandler implements ChannelUpstreamHandler {

View File

@ -116,7 +116,7 @@ public class JmxService {
connectorServer.start();
// create the publish url
String publishHost = HostResolver.resultPublishHostAddress(settings.get("jmx.publishHost"), settings, LOCAL_IP).getHostAddress();
String publishHost = HostResolver.resolvePublishHostAddress(settings.get("jmx.publishHost"), settings, LOCAL_IP).getHostAddress();
publishUrl = settings.get("jmx.publishUrl", JMXRMI_PUBLISH_URI_PATTERN).replace("{jmx.port}", Integer.toString(portNumber)).replace("{jmx.host}", publishHost);
} catch (Exception e) {
lastException.set(e);

View File

@ -46,6 +46,9 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
super(settings, client);
controller.registerHandler(POST, "/_cache/clear", this);
controller.registerHandler(POST, "/{index}/_cache/clear", this);
controller.registerHandler(GET, "/_cache/clear", this);
controller.registerHandler(GET, "/{index}/_cache/clear", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -48,6 +48,7 @@ public class RestCreateIndexAction extends BaseRestHandler {
@Inject public RestCreateIndexAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(RestRequest.Method.PUT, "/{index}", this);
controller.registerHandler(RestRequest.Method.POST, "/{index}", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -46,6 +46,9 @@ public class RestFlushAction extends BaseRestHandler {
super(settings, client);
controller.registerHandler(POST, "/_flush", this);
controller.registerHandler(POST, "/{index}/_flush", this);
controller.registerHandler(GET, "/_flush", this);
controller.registerHandler(GET, "/{index}/_flush", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -50,6 +50,9 @@ public class RestPutMappingAction extends BaseRestHandler {
super(settings, client);
controller.registerHandler(PUT, "/{index}/_mapping", this);
controller.registerHandler(PUT, "/{index}/{type}/_mapping", this);
controller.registerHandler(POST, "/{index}/_mapping", this);
controller.registerHandler(POST, "/{index}/{type}/_mapping", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -46,6 +46,9 @@ public class RestOptimizeAction extends BaseRestHandler {
super(settings, client);
controller.registerHandler(POST, "/_optimize", this);
controller.registerHandler(POST, "/{index}/_optimize", this);
controller.registerHandler(GET, "/_optimize", this);
controller.registerHandler(GET, "/{index}/_optimize", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -46,6 +46,9 @@ public class RestRefreshAction extends BaseRestHandler {
super(settings, client);
controller.registerHandler(POST, "/_refresh", this);
controller.registerHandler(POST, "/{index}/_refresh", this);
controller.registerHandler(GET, "/_refresh", this);
controller.registerHandler(GET, "/{index}/_refresh", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -43,7 +43,9 @@ public class RestIndexAction extends BaseRestHandler {
super(settings, client);
controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
controller.registerHandler(POST, "/{index}/{type}/{id}", this);
controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", new CreateHandler());
controller.registerHandler(POST, "/{index}/{type}/{id}/_create", new CreateHandler());
}
final class CreateHandler implements RestHandler {

View File

@ -233,7 +233,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// Bind and start to accept incoming connections.
InetAddress hostAddressX;
try {
hostAddressX = resultBindHostAddress(bindHost, settings);
hostAddressX = resolveBindHostAddress(bindHost, settings);
} catch (IOException e) {
throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e);
}
@ -261,11 +261,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
try {
InetAddress publishAddressX = resultPublishHostAddress(publishHost, settings);
InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
if (publishAddressX == null) {
// if its 0.0.0.0, we can't publish that.., default to the local ip address
if (boundAddress.getAddress().isAnyLocalAddress()) {
publishAddress = new InetSocketAddress(resultPublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
} else {
publishAddress = boundAddress;
}

View File

@ -25,4 +25,265 @@ package org.elasticsearch.util;
public class Bytes {
public static final byte[] EMPTY_ARRAY = new byte[0];
final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999,
99999999, 999999999, Integer.MAX_VALUE};
private static final byte[] LONG_MIN_VALUE_BYTES = "-9223372036854775808".getBytes();
// Requires positive x
static int stringSize(int x) {
for (int i = 0; ; i++)
if (x <= sizeTable[i])
return i + 1;
}
/**
* Blatant copy of Integer.toString, but returning a byte array instead of a String, as
* string charset decoding/encoding was killing us on performance.
*
* @param i integer to convert
* @return byte[] array containing literal ASCII char representation
*/
public static byte[] itoa(int i) {
int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i);
byte[] buf = new byte[size];
getChars(i, size, buf);
return buf;
}
final static byte[] digits = {
'0', '1', '2', '3', '4', '5',
'6', '7', '8', '9', 'a', 'b',
'c', 'd', 'e', 'f', 'g', 'h',
'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't',
'u', 'v', 'w', 'x', 'y', 'z'
};
final static byte[] DigitTens = {
'0', '0', '0', '0', '0', '0', '0', '0', '0', '0',
'1', '1', '1', '1', '1', '1', '1', '1', '1', '1',
'2', '2', '2', '2', '2', '2', '2', '2', '2', '2',
'3', '3', '3', '3', '3', '3', '3', '3', '3', '3',
'4', '4', '4', '4', '4', '4', '4', '4', '4', '4',
'5', '5', '5', '5', '5', '5', '5', '5', '5', '5',
'6', '6', '6', '6', '6', '6', '6', '6', '6', '6',
'7', '7', '7', '7', '7', '7', '7', '7', '7', '7',
'8', '8', '8', '8', '8', '8', '8', '8', '8', '8',
'9', '9', '9', '9', '9', '9', '9', '9', '9', '9',
};
final static byte[] DigitOnes = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
};
static void getChars(int i, int index, byte[] buf) {
int q, r;
int charPos = index;
byte sign = 0;
if (i < 0) {
sign = '-';
i = -i;
}
// Generate two digits per iteration
while (i >= 65536) {
q = i / 100;
// really: r = i - (q * 100);
r = i - ((q << 6) + (q << 5) + (q << 2));
i = q;
buf[--charPos] = DigitOnes[r];
buf[--charPos] = DigitTens[r];
}
// Fall thru to fast mode for smaller numbers
// assert(i <= 65536, i);
for (; ;) {
q = (i * 52429) >>> (16 + 3);
r = i - ((q << 3) + (q << 1)); // r = i-(q*10) ...
buf[--charPos] = digits[r];
i = q;
if (i == 0) break;
}
if (sign != 0) {
buf[--charPos] = sign;
}
}
public static int atoi(byte[] s)
throws NumberFormatException {
int result = 0;
boolean negative = false;
int i = 0, len = s.length;
int limit = -Integer.MAX_VALUE;
int multmin;
int digit;
if (len > 0) {
byte firstChar = s[0];
if (firstChar < '0') { // Possible leading "-"
if (firstChar == '-') {
negative = true;
limit = Integer.MIN_VALUE;
} else
throw new NumberFormatException();
if (len == 1) // Cannot have lone "-"
throw new NumberFormatException();
i++;
}
multmin = limit / 10;
while (i < len) {
// Accumulating negatively avoids surprises near MAX_VALUE
digit = Character.digit(s[i++], 10);
if (digit < 0) {
throw new NumberFormatException();
}
if (result < multmin) {
throw new NumberFormatException();
}
result *= 10;
if (result < limit + digit) {
throw new NumberFormatException();
}
result -= digit;
}
} else {
throw new NumberFormatException();
}
return negative ? result : -result;
}
public static byte[] ltoa(long i) {
if (i == Long.MIN_VALUE)
return LONG_MIN_VALUE_BYTES;
int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i);
byte[] buf = new byte[size];
getChars(i, size, buf);
return buf;
}
/**
* Places characters representing the integer i into the
* character array buf. The characters are placed into
* the buffer backwards starting with the least significant
* digit at the specified index (exclusive), and working
* backwards from there.
*
* Will fail if i == Long.MIN_VALUE
*/
static void getChars(long i, int index, byte[] buf) {
long q;
int r;
int charPos = index;
byte sign = 0;
if (i < 0) {
sign = '-';
i = -i;
}
// Get 2 digits/iteration using longs until quotient fits into an int
while (i > Integer.MAX_VALUE) {
q = i / 100;
// really: r = i - (q * 100);
r = (int) (i - ((q << 6) + (q << 5) + (q << 2)));
i = q;
buf[--charPos] = DigitOnes[r];
buf[--charPos] = DigitTens[r];
}
// Get 2 digits/iteration using ints
int q2;
int i2 = (int) i;
while (i2 >= 65536) {
q2 = i2 / 100;
// really: r = i2 - (q * 100);
r = i2 - ((q2 << 6) + (q2 << 5) + (q2 << 2));
i2 = q2;
buf[--charPos] = DigitOnes[r];
buf[--charPos] = DigitTens[r];
}
// Fall thru to fast mode for smaller numbers
// assert(i2 <= 65536, i2);
for (; ;) {
q2 = (i2 * 52429) >>> (16 + 3);
r = i2 - ((q2 << 3) + (q2 << 1)); // r = i2-(q2*10) ...
buf[--charPos] = digits[r];
i2 = q2;
if (i2 == 0) break;
}
if (sign != 0) {
buf[--charPos] = sign;
}
}
// Requires positive x
static int stringSize(long x) {
long p = 10;
for (int i = 1; i < 19; i++) {
if (x < p)
return i;
p = 10 * p;
}
return 19;
}
public static long atol(byte[] s)
throws NumberFormatException {
long result = 0;
boolean negative = false;
int i = 0, len = s.length;
long limit = -Long.MAX_VALUE;
long multmin;
int digit;
if (len > 0) {
byte firstChar = s[0];
if (firstChar < '0') { // Possible leading "-"
if (firstChar == '-') {
negative = true;
limit = Long.MIN_VALUE;
} else
throw new NumberFormatException();
if (len == 1) // Cannot have lone "-"
throw new NumberFormatException();
i++;
}
multmin = limit / 10;
while (i < len) {
// Accumulating negatively avoids surprises near MAX_VALUE
digit = Character.digit(s[i++], 10);
if (digit < 0) {
throw new NumberFormatException();
}
if (result < multmin) {
throw new NumberFormatException();
}
result *= 10;
if (result < limit + digit) {
throw new NumberFormatException();
}
result -= digit;
}
} else {
throw new NumberFormatException();
}
return negative ? result : -result;
}
}

View File

@ -44,19 +44,19 @@ public abstract class HostResolver {
return System.getProperty("java.net.preferIPv4Stack") != null && System.getProperty("java.net.preferIPv4Stack").equals("true");
}
public static InetAddress resultBindHostAddress(String bindHost, Settings settings) throws IOException {
return resultBindHostAddress(bindHost, settings, null);
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings) throws IOException {
return resolveBindHostAddress(bindHost, settings, null);
}
public static InetAddress resultBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException {
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(bindHost, settings.get(GLOBAL_NETWORK_BINDHOST_SETTING), defaultValue2);
}
public static InetAddress resultPublishHostAddress(String publishHost, Settings settings) throws IOException {
return resultPublishHostAddress(publishHost, settings, null);
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings) throws IOException {
return resolvePublishHostAddress(publishHost, settings, null);
}
public static InetAddress resultPublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException {
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(publishHost, settings.get(GLOBAL_NETWORK_PUBLISHHOST_SETTING), defaultValue2);
}

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.client
/**

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.client
import org.elasticsearch.action.ActionListener

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.client
import org.elasticsearch.action.ActionListener

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.node
import org.elasticsearch.groovy.client.GClient
@ -27,7 +46,7 @@ class GNode {
/**
* Start the node. If the node is already started, this method is no-op.
*/
def getStart() {
def start() {
node.start()
this
}
@ -35,15 +54,15 @@ class GNode {
/**
* Stops the node. If the node is already started, this method is no-op.
*/
def getStop() {
def stop() {
node.stop()
this
}
/**
* Closes the node (and {@link #stop} s if its running).
* Closes the node (and {@link #stop} s if its running).
*/
def getClose() {
def close() {
node.close()
this
}

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.node
import org.elasticsearch.groovy.util.json.JsonBuilder
@ -7,6 +26,8 @@ import org.elasticsearch.util.settings.ImmutableSettings
import org.elasticsearch.util.settings.loader.JsonSettingsLoader
/**
* The node builder allow to build a {@link GNode} instance.
*
* @author kimchy (shay.banon)
*/
public class GNodeBuilder {
@ -24,12 +45,12 @@ public class GNodeBuilder {
settingsBuilder.put(new JsonSettingsLoader().load(settingsBytes))
}
def getBuild() {
def build() {
Node node = new InternalNode(settingsBuilder.build(), loadConfigSettings)
new GNode(node)
}
def getNode() {
build.start
def node() {
build().start()
}
}

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.util.json
import org.elasticsearch.ElasticSearchGenerationException

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.test.client
import java.util.concurrent.CountDownLatch
@ -28,12 +47,12 @@ class DifferentApiExecutionTests {
}
}
node = nodeBuilder.node
node = nodeBuilder.node()
}
@AfterMethod
protected void tearDown() {
node.close
node.close()
}
@Test

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.test.client
import org.elasticsearch.groovy.node.GNode
@ -24,12 +43,12 @@ class SimpleActionsTests {
}
}
node = nodeBuilder.node
node = nodeBuilder.node()
}
@AfterMethod
protected void tearDown() {
node.close
node.close()
}

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.test.node
import org.elasticsearch.groovy.node.GNode
@ -19,7 +38,7 @@ class GNodeBuilderTests extends GroovyTestCase {
name = "test"
}
}
GNode node = nodeBuilder.node
node.stop.close
GNode node = nodeBuilder.node()
node.stop().close()
}
}

View File

@ -1,3 +1,22 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.groovy.util.json
/**

View File

@ -0,0 +1,140 @@
dependsOn(':elasticsearch')
apply plugin: 'java'
apply plugin: 'maven'
archivesBaseName = "elasticsearch-memcached"
explodedDistDir = new File(distsDir, 'exploded')
manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::Memcached", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
configurations.compile.transitive = true
configurations.testCompile.transitive = true
// no need to use the resource dir
sourceSets.main.resources.srcDir 'src/main/java'
sourceSets.test.resources.srcDir 'src/test/java'
// add the source files to the dist jar
jar {
from sourceSets.main.allJava
}
configurations {
dists
distLib {
visible = false
transitive = false
}
}
repositories {
mavenRepo urls: "http://bleu.west.spy.net/~dustin/m2repo/"
}
dependencies {
compile project(':elasticsearch')
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
testCompile 'spy:memcached:2.4.2'
}
test {
useTestNG()
jmvArgs = ["-ea", "-Xmx1024m"]
suiteName = project.name
listeners = ["org.elasticsearch.util.testng.Listeners"]
systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties")
}
task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << {
[explodedDistDir]*.mkdirs()
copy {
from configurations.distLib
into explodedDistDir
}
// remove elasticsearch files (compile above adds the elasticsearch one)
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") }
copy {
from libsDir
into explodedDistDir
}
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") }
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") }
}
task zip(type: Zip, dependsOn: ['explodedDist']) {
from(explodedDistDir) {
}
}
task release(dependsOn: [zip]) << {
ant.delete(dir: explodedDistDir)
copy {
from distsDir
into(new File(rootProject.distsDir, "plugins"))
}
}
configurations {
deployerJars
}
dependencies {
deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2"
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}
artifacts {
archives sourcesJar
archives javadocJar
}
uploadArchives {
repositories.mavenDeployer {
configuration = configurations.deployerJars
repository(url: rootProject.mavenRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
pom.project {
inceptionYear '2009'
name 'elasticsearch-plugins-memcached'
description 'Memcacehd Plugin for ElasticSearch'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
scm {
connection 'git://github.com/elasticsearch/elasticsearch.git'
developerConnection 'git@github.com:elasticsearch/elasticsearch.git'
url 'http://github.com/elasticsearch/elasticsearch'
}
}
pom.whenConfigured {pom ->
pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones
}
}
}

View File

@ -0,0 +1 @@
plugin=org.elasticsearch.memcached.MemcachedPlugin

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached;
import com.google.inject.Module;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.util.component.LifecycleComponent;
import java.util.Collection;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (shay.banon)
*/
public class MemcachedPlugin extends AbstractPlugin {
@Override public String name() {
return "memcached";
}
@Override public String description() {
return "Exports elasticsearch APIs over memcached";
}
@Override public Collection<Class<? extends Module>> modules() {
Collection<Class<? extends Module>> modules = newArrayList();
modules.add(MemcachedServerModule.class);
return modules;
}
@Override public Collection<Class<? extends LifecycleComponent>> services() {
Collection<Class<? extends LifecycleComponent>> services = newArrayList();
services.add(MemcachedServer.class);
return services;
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.rest.support.AbstractRestRequest;
import org.elasticsearch.rest.support.RestUtils;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.io.FastByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author kimchy (shay.banon)
*/
public class MemcachedRestRequest extends AbstractRestRequest {
private final Method method;
private final String uri;
private final int dataSize;
private boolean binary;
private final Map<String, String> params;
private final String path;
private byte[] data;
public MemcachedRestRequest(Method method, String uri, int dataSize, boolean binary) {
this.method = method;
this.uri = uri;
this.dataSize = dataSize;
this.binary = binary;
this.params = new HashMap<String, String>();
int pathEndPos = uri.indexOf('?');
if (pathEndPos < 0) {
this.path = uri;
} else {
this.path = uri.substring(0, pathEndPos);
RestUtils.decodeQueryString(uri, pathEndPos + 1, params);
}
}
@Override public Method method() {
return this.method;
}
@Override public String uri() {
return this.uri;
}
@Override public String path() {
return this.path;
}
public int getDataSize() {
return dataSize;
}
public void setData(byte[] data) {
this.data = data;
}
@Override public boolean hasContent() {
return data != null;
}
@Override public InputStream contentAsStream() {
return new FastByteArrayInputStream(data);
}
@Override public byte[] contentAsBytes() {
return data;
}
@Override public String contentAsString() {
return Unicode.fromBytes(data);
}
@Override public Set<String> headerNames() {
return ImmutableSet.of();
}
@Override public String header(String name) {
return null;
}
@Override public List<String> headers(String name) {
return ImmutableList.of();
}
@Override public String cookie() {
return null;
}
@Override public boolean hasParam(String key) {
return params.containsKey(key);
}
@Override public String param(String key) {
return params.get(key);
}
@Override public Map<String, String> params() {
return params;
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (shay.banon)
*/
public class MemcachedServer extends AbstractLifecycleComponent<MemcachedServer> {
private final MemcachedServerTransport transport;
private final TransportNodesInfo nodesInfo;
private final RestController restController;
@Inject public MemcachedServer(Settings settings, MemcachedServerTransport transport,
RestController restController, TransportNodesInfo nodesInfo) {
super(settings);
this.transport = transport;
this.restController = restController;
this.nodesInfo = nodesInfo;
}
@Override protected void doStart() throws ElasticSearchException {
transport.start();
if (logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
}
nodesInfo.putNodeAttribute("memcached_address", transport.boundAddress().publishAddress().toString());
}
@Override protected void doStop() throws ElasticSearchException {
nodesInfo.removeNodeAttribute("memcached_address");
transport.stop();
}
@Override protected void doClose() throws ElasticSearchException {
transport.close();
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import org.elasticsearch.util.Classes;
import org.elasticsearch.util.settings.Settings;
import static org.elasticsearch.util.guice.ModulesFactory.*;
/**
* @author kimchy (shay.banon)
*/
public class MemcachedServerModule extends AbstractModule {
private final Settings settings;
public MemcachedServerModule(Settings settings) {
this.settings = settings;
}
@SuppressWarnings({"unchecked"}) @Override protected void configure() {
bind(MemcachedServer.class).asEagerSingleton();
Class<? extends Module> defaultMemcachedServerTransportModule = null;
try {
Classes.getDefaultClassLoader().loadClass("org.elasticsearch.memcached.netty.NettyMemcachedServerTransport");
defaultMemcachedServerTransportModule = (Class<? extends Module>) Classes.getDefaultClassLoader().loadClass("org.elasticsearch.memcached.netty.NettyMemcachedServerTransportModule");
} catch (ClassNotFoundException e) {
// no netty one, ok...
if (settings.get("memcached.type") == null) {
// no explicit one is configured, bail
return;
}
}
Class<? extends Module> moduleClass = settings.getAsClass("memcached.type", defaultMemcachedServerTransportModule, "org.elasticsearch.memcached.", "MemcachedServerTransportModule");
createModule(moduleClass, settings).configure(binder());
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached;
import org.elasticsearch.util.component.LifecycleComponent;
import org.elasticsearch.util.transport.BoundTransportAddress;
/**
* @author kimchy (shay.banon)
*/
public interface MemcachedServerTransport extends LifecycleComponent<MemcachedServerTransport> {
BoundTransportAddress boundAddress();
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached;
import org.elasticsearch.ElasticSearchException;
/**
* @author kimchy (shay.banon)
*/
public class MemcachedTransportException extends ElasticSearchException {
public MemcachedTransportException(String msg) {
super(msg);
}
public MemcachedTransportException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.netty;
import org.elasticsearch.memcached.MemcachedRestRequest;
import org.elasticsearch.rest.RestController;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* @author kimchy (shay.banon)
*/
public class MemcachedDispatcher extends SimpleChannelUpstreamHandler {
private final RestController restController;
public MemcachedDispatcher(RestController restController) {
this.restController = restController;
}
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
MemcachedRestRequest request = (MemcachedRestRequest) e.getMessage();
restController.dispatchRequest(request, new MemcachedRestChannel(ctx.getChannel(), request));
super.messageReceived(ctx, e);
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.netty;
import org.elasticsearch.memcached.MemcachedRestRequest;
import org.elasticsearch.memcached.MemcachedTransportException;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.util.Bytes;
import org.elasticsearch.util.Unicode;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
/**
* @author kimchy (shay.banon)
*/
public class MemcachedRestChannel implements RestChannel {
public static final ChannelBuffer CRLF = ChannelBuffers.copiedBuffer("\r\n", "US-ASCII");
private static final ChannelBuffer VALUE = ChannelBuffers.copiedBuffer("VALUE ", "US-ASCII");
private static final ChannelBuffer EXISTS = ChannelBuffers.copiedBuffer("EXISTS\r\n", "US-ASCII");
private static final ChannelBuffer NOT_FOUND = ChannelBuffers.copiedBuffer("NOT_FOUND\r\n", "US-ASCII");
private static final ChannelBuffer NOT_STORED = ChannelBuffers.copiedBuffer("NOT_STORED\r\n", "US-ASCII");
private static final ChannelBuffer STORED = ChannelBuffers.copiedBuffer("STORED\r\n", "US-ASCII");
private static final ChannelBuffer DELETED = ChannelBuffers.copiedBuffer("DELETED\r\n", "US-ASCII");
private static final ChannelBuffer END = ChannelBuffers.copiedBuffer("END\r\n", "US-ASCII");
private static final ChannelBuffer OK = ChannelBuffers.copiedBuffer("OK\r\n", "US-ASCII");
private static final ChannelBuffer ERROR = ChannelBuffers.copiedBuffer("ERROR\r\n", "US-ASCII");
private static final ChannelBuffer CLIENT_ERROR = ChannelBuffers.copiedBuffer("CLIENT_ERROR\r\n", "US-ASCII");
private final Channel channel;
private final MemcachedRestRequest request;
public MemcachedRestChannel(Channel channel, MemcachedRestRequest request) {
this.channel = channel;
this.request = request;
}
@Override public void sendResponse(RestResponse response) {
if (response.status().getStatus() >= 500) {
channel.write(ERROR.duplicate());
} else {
if (request.method() == RestRequest.Method.POST) {
// TODO this is SET, can we send a payload?
channel.write(STORED.duplicate());
} else if (request.method() == RestRequest.Method.DELETE) {
channel.write(DELETED.duplicate());
} else { // GET
try {
ChannelBuffer writeBuffer = ChannelBuffers.dynamicBuffer(response.contentLength() + 512);
writeBuffer.writeBytes(VALUE.duplicate());
writeBuffer.writeBytes(Unicode.fromStringAsBytes(request.uri()));
writeBuffer.writeByte((byte) ' ');
writeBuffer.writeByte((byte) '0');
writeBuffer.writeByte((byte) ' ');
writeBuffer.writeBytes(Bytes.itoa(response.contentLength()));
writeBuffer.writeByte((byte) '\r');
writeBuffer.writeByte((byte) '\n');
writeBuffer.writeBytes(response.content(), 0, response.contentLength());
writeBuffer.writeByte((byte) '\r');
writeBuffer.writeByte((byte) '\n');
writeBuffer.writeBytes(END.duplicate());
channel.write(writeBuffer);
} catch (Exception e) {
throw new MemcachedTransportException("Failed to write 'get' response", e);
}
}
}
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.netty;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.memcached.MemcachedServerTransport;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.InetSocketTransportAddress;
import org.elasticsearch.util.transport.PortsRange;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.HostResolver.*;
/**
* @author kimchy (shay.banon)
*/
public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<MemcachedServerTransport> implements MemcachedServerTransport {
static {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory() {
@Override public InternalLogger newInstance(String name) {
return super.newInstance(name.replace("org.jboss.netty.", "netty."));
}
});
}
private final RestController restController;
private final int workerCount;
private final String port;
private final String bindHost;
private final String publishHost;
private final Boolean tcpNoDelay;
private final Boolean tcpKeepAlive;
private final Boolean reuseAddress;
private final SizeValue tcpSendBufferSize;
private final SizeValue tcpReceiveBufferSize;
private volatile ServerBootstrap serverBootstrap;
private volatile BoundTransportAddress boundAddress;
private volatile Channel serverChannel;
private volatile OpenChannelsHandler serverOpenChannels;
@Inject public NettyMemcachedServerTransport(Settings settings, RestController restController) {
super(settings);
this.restController = restController;
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.port = componentSettings.get("port", "11211-11311");
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", true);
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", null);
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", null);
this.tcpSendBufferSize = componentSettings.getAsSize("tcp_send_buffer_size", null);
this.tcpReceiveBufferSize = componentSettings.getAsSize("tcp_receive_buffer_size", null);
}
@Override public BoundTransportAddress boundAddress() {
return boundAddress;
}
@Override protected void doStart() throws ElasticSearchException {
this.serverOpenChannels = new OpenChannelsHandler();
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedBoss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedIoWorker")),
workerCount));
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
@Override public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("openChannels", serverOpenChannels);
pipeline.addLast("decoder", new TextMemcachedDecoder());
pipeline.addLast("dispatcher", new MemcachedDispatcher(restController));
return pipeline;
}
};
serverBootstrap.setPipelineFactory(pipelineFactory);
if (tcpNoDelay != null) {
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
}
if (tcpKeepAlive != null) {
serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
}
if (tcpSendBufferSize != null) {
serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes());
}
if (tcpReceiveBufferSize != null) {
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
}
if (reuseAddress != null) {
serverBootstrap.setOption("reuseAddress", reuseAddress);
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
}
// Bind and start to accept incoming connections.
InetAddress hostAddressX;
try {
hostAddressX = resolveBindHostAddress(bindHost, settings);
} catch (IOException e) {
throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e);
}
final InetAddress hostAddress = hostAddressX;
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override public boolean onPortNumber(int portNumber) {
try {
serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
}
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get());
}
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
try {
InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
if (publishAddressX == null) {
// if its 0.0.0.0, we can't publish that.., default to the local ip address
if (boundAddress.getAddress().isAnyLocalAddress()) {
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
} else {
publishAddress = boundAddress;
}
} else {
publishAddress = new InetSocketAddress(publishAddressX, boundAddress.getPort());
}
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}
@Override protected void doStop() throws ElasticSearchException {
if (serverChannel != null) {
serverChannel.close().awaitUninterruptibly();
serverChannel = null;
}
if (serverOpenChannels != null) {
serverOpenChannels.close();
serverOpenChannels = null;
}
if (serverBootstrap != null) {
serverBootstrap.releaseExternalResources();
serverBootstrap = null;
}
}
@Override protected void doClose() throws ElasticSearchException {
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.netty;
import com.google.inject.AbstractModule;
import org.elasticsearch.memcached.MemcachedServerTransport;
/**
* @author kimchy (shay.banon)
*/
public class NettyMemcachedServerTransportModule extends AbstractModule {
@Override protected void configure() {
bind(MemcachedServerTransport.class).to(NettyMemcachedServerTransport.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.netty;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet;
import org.jboss.netty.channel.*;
/**
* @author kimchy (shay.banon)
*/
@ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
public class OpenChannelsHandler implements ChannelUpstreamHandler {
private NonBlockingHashSet<Channel> openChannels = new NonBlockingHashSet<Channel>();
private final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
openChannels.remove(future.getChannel());
}
};
@Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
if (evt.getState() == ChannelState.OPEN) {
boolean added = openChannels.add(ctx.getChannel());
if (added) {
ctx.getChannel().getCloseFuture().addListener(remover);
}
}
}
ctx.sendUpstream(e);
}
public void close() {
for (Channel channel : openChannels) {
channel.close().awaitUninterruptibly();
}
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.netty;
import org.elasticsearch.memcached.MemcachedRestRequest;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.util.Unicode;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import java.io.StreamCorruptedException;
import java.util.regex.Pattern;
/**
* @author kimchy (shay.banon)
*/
public class TextMemcachedDecoder extends FrameDecoder {
private final Pattern lineSplit = Pattern.compile(" +");
public static final byte CR = 13;
public static final byte LF = 10;
public static final byte[] CRLF = new byte[]{CR, LF};
public static enum Reply {
STORED, NOT_STORED, EXISTS, NOT_FOUND, DELETED, STAT, VALUE, END, OK, VERSION,
ERROR, CLIENT_ERROR, SERVER_ERROR;
private final byte[] bytes;
Reply() {
this.bytes = this.toString().getBytes();
}
public byte[] bytes() {
return bytes;
}
}
private volatile StringBuffer sb = new StringBuffer();
private volatile MemcachedRestRequest request;
public TextMemcachedDecoder() {
super(false);
}
@Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
MemcachedRestRequest request = this.request;
if (request == null) {
buffer.markReaderIndex();
// need to read a header
boolean done = false;
StringBuffer sb = this.sb;
int readableBytes = buffer.readableBytes();
for (int i = 0; i < readableBytes; i++) {
byte next = buffer.readByte();
if (next == CR) {
next = buffer.readByte();
if (next == LF) {
done = true;
break;
}
} else if (next == LF) {
done = true;
break;
} else {
sb.append((char) next);
}
}
if (!done) {
buffer.resetReaderIndex();
return null;
}
String[] args = lineSplit.split(sb);
// we read the text, clear it
sb.setLength(0);
String cmd = args[0];
String uri = args[1];
if ("get".equals(cmd)) {
request = new MemcachedRestRequest(RestRequest.Method.GET, uri, -1, false);
if (args.length > 3) {
request.setData(Unicode.fromStringAsBytes(args[2]));
}
return request;
} else if ("delete".equals(cmd)) {
request = new MemcachedRestRequest(RestRequest.Method.DELETE, uri, -1, false);
// if (args.length > 3) {
// request.setData(Unicode.fromStringAsBytes(args[2]));
// }
return request;
} else if ("set".equals(cmd)) {
this.request = new MemcachedRestRequest(RestRequest.Method.POST, uri, Integer.parseInt(args[4]), false);
buffer.markReaderIndex();
} else if ("quit".equals(cmd)) {
channel.disconnect();
}
} else {
if (buffer.readableBytes() < (request.getDataSize() + 2)) {
return null;
}
byte[] data = new byte[request.getDataSize()];
buffer.readBytes(data, 0, data.length);
byte next = buffer.readByte();
if (next == CR) {
next = buffer.readByte();
if (next == LF) {
request.setData(data);
// reset
this.request = null;
return request;
} else {
this.request = null;
throw new StreamCorruptedException("Expecting \r\n after data block");
}
} else {
this.request = null;
throw new StreamCorruptedException("Expecting \r\n after data block");
}
}
return null;
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
this.request = null;
sb.setLength(0);
e.getCause().printStackTrace();
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.test;
import net.spy.memcached.MemcachedClient;
import org.elasticsearch.node.Node;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.node.NodeBuilder.*;
import static org.elasticsearch.util.json.JsonBuilder.*;
/**
* @author kimchy (shay.banon)
*/
@Test
public class SimpleMemcachedActionsTests {
private Node node;
private MemcachedClient memcachedClient;
@BeforeMethod
public void setup() throws IOException {
node = nodeBuilder().node();
// NodesInfoResponse nodesInfo = node.client().admin().cluster().nodesInfo(nodesInfo()).actionGet();
memcachedClient = new MemcachedClient(new InetSocketAddress("localhost", 11211));
}
@AfterMethod
public void tearDown() {
memcachedClient.shutdown();
node.close();
}
@Test public void testSimpleOperations() throws Exception {
Future setResult = memcachedClient.set("/test/person/1", 0, binaryJsonBuilder().startObject().field("test", "value").endObject().copiedBytes());
setResult.get(10, TimeUnit.SECONDS);
String getResult = (String) memcachedClient.get("/_refresh");
System.out.println("REFRESH " + getResult);
getResult = (String) memcachedClient.get("/test/person/1");
System.out.println("GET " + getResult);
Future deleteResult = memcachedClient.delete("/test/person/1");
deleteResult.get(10, TimeUnit.SECONDS);
getResult = (String) memcachedClient.get("/_refresh");
System.out.println("REFRESH " + getResult);
getResult = (String) memcachedClient.get("/test/person/1");
System.out.println("GET " + getResult);
}
}

View File

@ -8,6 +8,7 @@ include 'benchmark-micro'
include 'plugins-attachments'
include 'plugins-groovy'
include 'plugins-memcached'
rootProject.name = 'elasticsearch-root'
rootProject.children.each {project ->