diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 203fcd182f1..9fb4854b0bc 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -34,10 +34,12 @@
lifecycle
linefeeds
lucene
+ memcached
metadata
millis
mmap
multi
+ multiline
nanos
newcount
ngram
diff --git a/.idea/libraries/spymemcached.xml b/.idea/libraries/spymemcached.xml
new file mode 100644
index 00000000000..6655e882415
--- /dev/null
+++ b/.idea/libraries/spymemcached.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
index 7ca869a1a29..29f9e415dd4 100644
--- a/.idea/modules.xml
+++ b/.idea/modules.xml
@@ -7,6 +7,7 @@
+
diff --git a/.idea/modules/elasticsearch-root.iml b/.idea/modules/elasticsearch-root.iml
index 0e40769cd9e..bb7f1a17de8 100644
--- a/.idea/modules/elasticsearch-root.iml
+++ b/.idea/modules/elasticsearch-root.iml
@@ -13,6 +13,7 @@
+
diff --git a/.idea/modules/plugin-memcached.iml b/.idea/modules/plugin-memcached.iml
new file mode 100644
index 00000000000..373ea015c24
--- /dev/null
+++ b/.idea/modules/plugin-memcached.iml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java
index a4edcb21c4a..1aced805385 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java
@@ -109,7 +109,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent impl
if (System.getProperty("jgroups.bind_addr") == null) {
// automatically set the bind address based on ElasticSearch default bindings...
try {
- InetAddress bindAddress = HostResolver.resultBindHostAddress(null, settings, HostResolver.LOCAL_IP);
+ InetAddress bindAddress = HostResolver.resolveBindHostAddress(null, settings, HostResolver.LOCAL_IP);
if ((bindAddress instanceof Inet4Address && HostResolver.isIPv4()) || (bindAddress instanceof Inet6Address && !HostResolver.isIPv4())) {
sysPropsSet.put("jgroups.bind_addr", bindAddress.getHostAddress());
System.setProperty("jgroups.bind_addr", bindAddress.getHostAddress());
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java
index 53d79677074..bac47528e53 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java
@@ -39,9 +39,9 @@ public class NettyHttpRequest extends AbstractRestRequest implements HttpRequest
private final org.jboss.netty.handler.codec.http.HttpRequest request;
- private Map params;
+ private final Map params;
- private String path;
+ private final String path;
public NettyHttpRequest(org.jboss.netty.handler.codec.http.HttpRequest request) {
this.request = request;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
index 803310b2599..905c59eaf33 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
@@ -163,7 +163,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent implem
// Bind and start to accept incoming connections.
InetAddress hostAddressX;
try {
- hostAddressX = resultBindHostAddress(bindHost, settings);
+ hostAddressX = resolveBindHostAddress(bindHost, settings);
} catch (IOException e) {
throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e);
}
@@ -261,11 +261,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
try {
- InetAddress publishAddressX = resultPublishHostAddress(publishHost, settings);
+ InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
if (publishAddressX == null) {
// if its 0.0.0.0, we can't publish that.., default to the local ip address
if (boundAddress.getAddress().isAnyLocalAddress()) {
- publishAddress = new InetSocketAddress(resultPublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
+ publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
} else {
publishAddress = boundAddress;
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java
index d1bf24f7f39..d5e37227c9e 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java
@@ -25,4 +25,265 @@ package org.elasticsearch.util;
public class Bytes {
public static final byte[] EMPTY_ARRAY = new byte[0];
+
+
+ final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999,
+ 99999999, 999999999, Integer.MAX_VALUE};
+ private static final byte[] LONG_MIN_VALUE_BYTES = "-9223372036854775808".getBytes();
+
+ // Requires positive x
+
+ static int stringSize(int x) {
+ for (int i = 0; ; i++)
+ if (x <= sizeTable[i])
+ return i + 1;
+ }
+
+ /**
+ * Blatant copy of Integer.toString, but returning a byte array instead of a String, as
+ * string charset decoding/encoding was killing us on performance.
+ *
+ * @param i integer to convert
+ * @return byte[] array containing literal ASCII char representation
+ */
+ public static byte[] itoa(int i) {
+ int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i);
+ byte[] buf = new byte[size];
+ getChars(i, size, buf);
+ return buf;
+ }
+
+ final static byte[] digits = {
+ '0', '1', '2', '3', '4', '5',
+ '6', '7', '8', '9', 'a', 'b',
+ 'c', 'd', 'e', 'f', 'g', 'h',
+ 'i', 'j', 'k', 'l', 'm', 'n',
+ 'o', 'p', 'q', 'r', 's', 't',
+ 'u', 'v', 'w', 'x', 'y', 'z'
+ };
+
+ final static byte[] DigitTens = {
+ '0', '0', '0', '0', '0', '0', '0', '0', '0', '0',
+ '1', '1', '1', '1', '1', '1', '1', '1', '1', '1',
+ '2', '2', '2', '2', '2', '2', '2', '2', '2', '2',
+ '3', '3', '3', '3', '3', '3', '3', '3', '3', '3',
+ '4', '4', '4', '4', '4', '4', '4', '4', '4', '4',
+ '5', '5', '5', '5', '5', '5', '5', '5', '5', '5',
+ '6', '6', '6', '6', '6', '6', '6', '6', '6', '6',
+ '7', '7', '7', '7', '7', '7', '7', '7', '7', '7',
+ '8', '8', '8', '8', '8', '8', '8', '8', '8', '8',
+ '9', '9', '9', '9', '9', '9', '9', '9', '9', '9',
+ };
+
+ final static byte[] DigitOnes = {
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ };
+
+ static void getChars(int i, int index, byte[] buf) {
+ int q, r;
+ int charPos = index;
+ byte sign = 0;
+
+ if (i < 0) {
+ sign = '-';
+ i = -i;
+ }
+
+ // Generate two digits per iteration
+ while (i >= 65536) {
+ q = i / 100;
+ // really: r = i - (q * 100);
+ r = i - ((q << 6) + (q << 5) + (q << 2));
+ i = q;
+ buf[--charPos] = DigitOnes[r];
+ buf[--charPos] = DigitTens[r];
+ }
+
+ // Fall thru to fast mode for smaller numbers
+ // assert(i <= 65536, i);
+ for (; ;) {
+ q = (i * 52429) >>> (16 + 3);
+ r = i - ((q << 3) + (q << 1)); // r = i-(q*10) ...
+ buf[--charPos] = digits[r];
+ i = q;
+ if (i == 0) break;
+ }
+ if (sign != 0) {
+ buf[--charPos] = sign;
+ }
+ }
+
+ public static int atoi(byte[] s)
+ throws NumberFormatException {
+ int result = 0;
+ boolean negative = false;
+ int i = 0, len = s.length;
+ int limit = -Integer.MAX_VALUE;
+ int multmin;
+ int digit;
+
+ if (len > 0) {
+ byte firstChar = s[0];
+ if (firstChar < '0') { // Possible leading "-"
+ if (firstChar == '-') {
+ negative = true;
+ limit = Integer.MIN_VALUE;
+ } else
+ throw new NumberFormatException();
+
+ if (len == 1) // Cannot have lone "-"
+ throw new NumberFormatException();
+ i++;
+ }
+ multmin = limit / 10;
+ while (i < len) {
+ // Accumulating negatively avoids surprises near MAX_VALUE
+ digit = Character.digit(s[i++], 10);
+ if (digit < 0) {
+ throw new NumberFormatException();
+ }
+ if (result < multmin) {
+ throw new NumberFormatException();
+ }
+ result *= 10;
+ if (result < limit + digit) {
+ throw new NumberFormatException();
+ }
+ result -= digit;
+ }
+ } else {
+ throw new NumberFormatException();
+ }
+ return negative ? result : -result;
+ }
+
+ public static byte[] ltoa(long i) {
+ if (i == Long.MIN_VALUE)
+ return LONG_MIN_VALUE_BYTES;
+ int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i);
+ byte[] buf = new byte[size];
+ getChars(i, size, buf);
+ return buf;
+ }
+
+ /**
+ * Places characters representing the integer i into the
+ * character array buf. The characters are placed into
+ * the buffer backwards starting with the least significant
+ * digit at the specified index (exclusive), and working
+ * backwards from there.
+ *
+ * Will fail if i == Long.MIN_VALUE
+ */
+ static void getChars(long i, int index, byte[] buf) {
+ long q;
+ int r;
+ int charPos = index;
+ byte sign = 0;
+
+ if (i < 0) {
+ sign = '-';
+ i = -i;
+ }
+
+ // Get 2 digits/iteration using longs until quotient fits into an int
+ while (i > Integer.MAX_VALUE) {
+ q = i / 100;
+ // really: r = i - (q * 100);
+ r = (int) (i - ((q << 6) + (q << 5) + (q << 2)));
+ i = q;
+ buf[--charPos] = DigitOnes[r];
+ buf[--charPos] = DigitTens[r];
+ }
+
+ // Get 2 digits/iteration using ints
+ int q2;
+ int i2 = (int) i;
+ while (i2 >= 65536) {
+ q2 = i2 / 100;
+ // really: r = i2 - (q * 100);
+ r = i2 - ((q2 << 6) + (q2 << 5) + (q2 << 2));
+ i2 = q2;
+ buf[--charPos] = DigitOnes[r];
+ buf[--charPos] = DigitTens[r];
+ }
+
+ // Fall thru to fast mode for smaller numbers
+ // assert(i2 <= 65536, i2);
+ for (; ;) {
+ q2 = (i2 * 52429) >>> (16 + 3);
+ r = i2 - ((q2 << 3) + (q2 << 1)); // r = i2-(q2*10) ...
+ buf[--charPos] = digits[r];
+ i2 = q2;
+ if (i2 == 0) break;
+ }
+ if (sign != 0) {
+ buf[--charPos] = sign;
+ }
+ }
+
+ // Requires positive x
+
+ static int stringSize(long x) {
+ long p = 10;
+ for (int i = 1; i < 19; i++) {
+ if (x < p)
+ return i;
+ p = 10 * p;
+ }
+ return 19;
+ }
+
+ public static long atol(byte[] s)
+ throws NumberFormatException {
+ long result = 0;
+ boolean negative = false;
+ int i = 0, len = s.length;
+ long limit = -Long.MAX_VALUE;
+ long multmin;
+ int digit;
+
+ if (len > 0) {
+ byte firstChar = s[0];
+ if (firstChar < '0') { // Possible leading "-"
+ if (firstChar == '-') {
+ negative = true;
+ limit = Long.MIN_VALUE;
+ } else
+ throw new NumberFormatException();
+
+ if (len == 1) // Cannot have lone "-"
+ throw new NumberFormatException();
+ i++;
+ }
+ multmin = limit / 10;
+ while (i < len) {
+ // Accumulating negatively avoids surprises near MAX_VALUE
+ digit = Character.digit(s[i++], 10);
+ if (digit < 0) {
+ throw new NumberFormatException();
+ }
+ if (result < multmin) {
+ throw new NumberFormatException();
+ }
+ result *= 10;
+ if (result < limit + digit) {
+ throw new NumberFormatException();
+ }
+ result -= digit;
+ }
+ } else {
+ throw new NumberFormatException();
+ }
+ return negative ? result : -result;
+ }
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java
index f725fa10691..0a536ac4fb4 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java
@@ -44,19 +44,19 @@ public abstract class HostResolver {
return System.getProperty("java.net.preferIPv4Stack") != null && System.getProperty("java.net.preferIPv4Stack").equals("true");
}
- public static InetAddress resultBindHostAddress(String bindHost, Settings settings) throws IOException {
- return resultBindHostAddress(bindHost, settings, null);
+ public static InetAddress resolveBindHostAddress(String bindHost, Settings settings) throws IOException {
+ return resolveBindHostAddress(bindHost, settings, null);
}
- public static InetAddress resultBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException {
+ public static InetAddress resolveBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(bindHost, settings.get(GLOBAL_NETWORK_BINDHOST_SETTING), defaultValue2);
}
- public static InetAddress resultPublishHostAddress(String publishHost, Settings settings) throws IOException {
- return resultPublishHostAddress(publishHost, settings, null);
+ public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings) throws IOException {
+ return resolvePublishHostAddress(publishHost, settings, null);
}
- public static InetAddress resultPublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException {
+ public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(publishHost, settings.get(GLOBAL_NETWORK_PUBLISHHOST_SETTING), defaultValue2);
}
diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy
index e05521ba03f..6bd02abd734 100644
--- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy
+++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.client
/**
diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy
index 3fd03502767..bdcd4edb533 100644
--- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy
+++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.client
import org.elasticsearch.action.ActionListener
diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy
index 24f41613557..61c207005d0 100644
--- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy
+++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.client
import org.elasticsearch.action.ActionListener
diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy
index 35d59f5428d..57190f51f24 100644
--- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy
+++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.node
import org.elasticsearch.groovy.client.GClient
@@ -27,7 +46,7 @@ class GNode {
/**
* Start the node. If the node is already started, this method is no-op.
*/
- def getStart() {
+ def start() {
node.start()
this
}
@@ -35,15 +54,15 @@ class GNode {
/**
* Stops the node. If the node is already started, this method is no-op.
*/
- def getStop() {
+ def stop() {
node.stop()
this
}
/**
- * Closes the node (and {@link #stop} s if its running).
+ * Closes the node (and {@link #stop} s if its running).
*/
- def getClose() {
+ def close() {
node.close()
this
}
diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy
index a363623631b..63a8586b25d 100644
--- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy
+++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.node
import org.elasticsearch.groovy.util.json.JsonBuilder
@@ -7,6 +26,8 @@ import org.elasticsearch.util.settings.ImmutableSettings
import org.elasticsearch.util.settings.loader.JsonSettingsLoader
/**
+ * The node builder allow to build a {@link GNode} instance.
+ *
* @author kimchy (shay.banon)
*/
public class GNodeBuilder {
@@ -24,12 +45,12 @@ public class GNodeBuilder {
settingsBuilder.put(new JsonSettingsLoader().load(settingsBytes))
}
- def getBuild() {
+ def build() {
Node node = new InternalNode(settingsBuilder.build(), loadConfigSettings)
new GNode(node)
}
- def getNode() {
- build.start
+ def node() {
+ build().start()
}
}
diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy
index 1d31817cfbd..4fcd6d5b25e 100644
--- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy
+++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.util.json
import org.elasticsearch.ElasticSearchGenerationException
diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy
index 70b4c9a9def..eeb41e0a411 100644
--- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy
+++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.test.client
import java.util.concurrent.CountDownLatch
@@ -28,12 +47,12 @@ class DifferentApiExecutionTests {
}
}
- node = nodeBuilder.node
+ node = nodeBuilder.node()
}
@AfterMethod
protected void tearDown() {
- node.close
+ node.close()
}
@Test
diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy
index 8554f2d93f1..b1fc5bea9f9 100644
--- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy
+++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.test.client
import org.elasticsearch.groovy.node.GNode
@@ -24,12 +43,12 @@ class SimpleActionsTests {
}
}
- node = nodeBuilder.node
+ node = nodeBuilder.node()
}
@AfterMethod
protected void tearDown() {
- node.close
+ node.close()
}
diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy
index 10725f84a58..6dc4b4eb023 100644
--- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy
+++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.test.node
import org.elasticsearch.groovy.node.GNode
@@ -19,7 +38,7 @@ class GNodeBuilderTests extends GroovyTestCase {
name = "test"
}
}
- GNode node = nodeBuilder.node
- node.stop.close
+ GNode node = nodeBuilder.node()
+ node.stop().close()
}
}
diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy
index ed956526834..5a51b0db818 100644
--- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy
+++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy
@@ -1,3 +1,22 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.elasticsearch.groovy.util.json
/**
diff --git a/plugins/memcached/build.gradle b/plugins/memcached/build.gradle
new file mode 100644
index 00000000000..55b7b40e8dc
--- /dev/null
+++ b/plugins/memcached/build.gradle
@@ -0,0 +1,140 @@
+dependsOn(':elasticsearch')
+
+apply plugin: 'java'
+apply plugin: 'maven'
+
+archivesBaseName = "elasticsearch-memcached"
+
+explodedDistDir = new File(distsDir, 'exploded')
+
+manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::Memcached", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
+
+configurations.compile.transitive = true
+configurations.testCompile.transitive = true
+
+// no need to use the resource dir
+sourceSets.main.resources.srcDir 'src/main/java'
+sourceSets.test.resources.srcDir 'src/test/java'
+
+// add the source files to the dist jar
+jar {
+ from sourceSets.main.allJava
+}
+
+configurations {
+ dists
+ distLib {
+ visible = false
+ transitive = false
+ }
+}
+
+repositories {
+ mavenRepo urls: "http://bleu.west.spy.net/~dustin/m2repo/"
+}
+
+dependencies {
+ compile project(':elasticsearch')
+
+ testCompile project(':test-testng')
+ testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
+ testCompile 'spy:memcached:2.4.2'
+}
+
+test {
+ useTestNG()
+ jmvArgs = ["-ea", "-Xmx1024m"]
+ suiteName = project.name
+ listeners = ["org.elasticsearch.util.testng.Listeners"]
+ systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties")
+}
+
+task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << {
+ [explodedDistDir]*.mkdirs()
+
+ copy {
+ from configurations.distLib
+ into explodedDistDir
+ }
+
+ // remove elasticsearch files (compile above adds the elasticsearch one)
+ ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") }
+
+ copy {
+ from libsDir
+ into explodedDistDir
+ }
+
+ ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") }
+ ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") }
+}
+
+task zip(type: Zip, dependsOn: ['explodedDist']) {
+ from(explodedDistDir) {
+ }
+}
+
+task release(dependsOn: [zip]) << {
+ ant.delete(dir: explodedDistDir)
+ copy {
+ from distsDir
+ into(new File(rootProject.distsDir, "plugins"))
+ }
+}
+
+configurations {
+ deployerJars
+}
+
+dependencies {
+ deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2"
+}
+
+task sourcesJar(type: Jar, dependsOn: classes) {
+ classifier = 'sources'
+ from sourceSets.main.allSource
+}
+
+task javadocJar(type: Jar, dependsOn: javadoc) {
+ classifier = 'javadoc'
+ from javadoc.destinationDir
+}
+
+artifacts {
+ archives sourcesJar
+ archives javadocJar
+}
+
+uploadArchives {
+ repositories.mavenDeployer {
+ configuration = configurations.deployerJars
+ repository(url: rootProject.mavenRepoUrl) {
+ authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
+ }
+ snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) {
+ authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
+ }
+
+ pom.project {
+ inceptionYear '2009'
+ name 'elasticsearch-plugins-memcached'
+ description 'Memcacehd Plugin for ElasticSearch'
+ licenses {
+ license {
+ name 'The Apache Software License, Version 2.0'
+ url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+ distribution 'repo'
+ }
+ }
+ scm {
+ connection 'git://github.com/elasticsearch/elasticsearch.git'
+ developerConnection 'git@github.com:elasticsearch/elasticsearch.git'
+ url 'http://github.com/elasticsearch/elasticsearch'
+ }
+ }
+
+ pom.whenConfigured {pom ->
+ pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones
+ }
+ }
+}
\ No newline at end of file
diff --git a/plugins/memcached/src/main/java/es-plugin.properties b/plugins/memcached/src/main/java/es-plugin.properties
new file mode 100644
index 00000000000..42ab805aa06
--- /dev/null
+++ b/plugins/memcached/src/main/java/es-plugin.properties
@@ -0,0 +1 @@
+plugin=org.elasticsearch.memcached.MemcachedPlugin
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedPlugin.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedPlugin.java
new file mode 100644
index 00000000000..30080f0def6
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedPlugin.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached;
+
+import com.google.inject.Module;
+import org.elasticsearch.plugins.AbstractPlugin;
+import org.elasticsearch.util.component.LifecycleComponent;
+
+import java.util.Collection;
+
+import static com.google.common.collect.Lists.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class MemcachedPlugin extends AbstractPlugin {
+
+ @Override public String name() {
+ return "memcached";
+ }
+
+ @Override public String description() {
+ return "Exports elasticsearch APIs over memcached";
+ }
+
+ @Override public Collection> modules() {
+ Collection> modules = newArrayList();
+ modules.add(MemcachedServerModule.class);
+ return modules;
+ }
+
+ @Override public Collection> services() {
+ Collection> services = newArrayList();
+ services.add(MemcachedServer.class);
+ return services;
+ }
+}
\ No newline at end of file
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java
new file mode 100644
index 00000000000..f2fea87bfdf
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.elasticsearch.rest.support.AbstractRestRequest;
+import org.elasticsearch.rest.support.RestUtils;
+import org.elasticsearch.util.Unicode;
+import org.elasticsearch.util.io.FastByteArrayInputStream;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class MemcachedRestRequest extends AbstractRestRequest {
+
+ private final Method method;
+
+ private final String uri;
+
+ private final int dataSize;
+
+ private boolean binary;
+
+ private final Map params;
+
+ private final String path;
+
+ private byte[] data;
+
+ public MemcachedRestRequest(Method method, String uri, int dataSize, boolean binary) {
+ this.method = method;
+ this.uri = uri;
+ this.dataSize = dataSize;
+ this.binary = binary;
+ this.params = new HashMap();
+ int pathEndPos = uri.indexOf('?');
+ if (pathEndPos < 0) {
+ this.path = uri;
+ } else {
+ this.path = uri.substring(0, pathEndPos);
+ RestUtils.decodeQueryString(uri, pathEndPos + 1, params);
+ }
+ }
+
+ @Override public Method method() {
+ return this.method;
+ }
+
+ @Override public String uri() {
+ return this.uri;
+ }
+
+ @Override public String path() {
+ return this.path;
+ }
+
+ public int getDataSize() {
+ return dataSize;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ @Override public boolean hasContent() {
+ return data != null;
+ }
+
+ @Override public InputStream contentAsStream() {
+ return new FastByteArrayInputStream(data);
+ }
+
+ @Override public byte[] contentAsBytes() {
+ return data;
+ }
+
+ @Override public String contentAsString() {
+ return Unicode.fromBytes(data);
+ }
+
+ @Override public Set headerNames() {
+ return ImmutableSet.of();
+ }
+
+ @Override public String header(String name) {
+ return null;
+ }
+
+ @Override public List headers(String name) {
+ return ImmutableList.of();
+ }
+
+ @Override public String cookie() {
+ return null;
+ }
+
+ @Override public boolean hasParam(String key) {
+ return params.containsKey(key);
+ }
+
+ @Override public String param(String key) {
+ return params.get(key);
+ }
+
+ @Override public Map params() {
+ return params;
+ }
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServer.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServer.java
new file mode 100644
index 00000000000..57ed95318cc
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached;
+
+import com.google.inject.Inject;
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.util.component.AbstractLifecycleComponent;
+import org.elasticsearch.util.settings.Settings;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class MemcachedServer extends AbstractLifecycleComponent {
+
+ private final MemcachedServerTransport transport;
+
+ private final TransportNodesInfo nodesInfo;
+
+ private final RestController restController;
+
+ @Inject public MemcachedServer(Settings settings, MemcachedServerTransport transport,
+ RestController restController, TransportNodesInfo nodesInfo) {
+ super(settings);
+ this.transport = transport;
+ this.restController = restController;
+ this.nodesInfo = nodesInfo;
+ }
+
+ @Override protected void doStart() throws ElasticSearchException {
+ transport.start();
+ if (logger.isInfoEnabled()) {
+ logger.info("{}", transport.boundAddress());
+ }
+ nodesInfo.putNodeAttribute("memcached_address", transport.boundAddress().publishAddress().toString());
+ }
+
+ @Override protected void doStop() throws ElasticSearchException {
+ nodesInfo.removeNodeAttribute("memcached_address");
+ transport.stop();
+ }
+
+ @Override protected void doClose() throws ElasticSearchException {
+ transport.close();
+ }
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerModule.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerModule.java
new file mode 100644
index 00000000000..9c4c6c6df45
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerModule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import org.elasticsearch.util.Classes;
+import org.elasticsearch.util.settings.Settings;
+
+import static org.elasticsearch.util.guice.ModulesFactory.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class MemcachedServerModule extends AbstractModule {
+
+ private final Settings settings;
+
+ public MemcachedServerModule(Settings settings) {
+ this.settings = settings;
+ }
+
+ @SuppressWarnings({"unchecked"}) @Override protected void configure() {
+ bind(MemcachedServer.class).asEagerSingleton();
+
+ Class extends Module> defaultMemcachedServerTransportModule = null;
+ try {
+ Classes.getDefaultClassLoader().loadClass("org.elasticsearch.memcached.netty.NettyMemcachedServerTransport");
+ defaultMemcachedServerTransportModule = (Class extends Module>) Classes.getDefaultClassLoader().loadClass("org.elasticsearch.memcached.netty.NettyMemcachedServerTransportModule");
+ } catch (ClassNotFoundException e) {
+ // no netty one, ok...
+ if (settings.get("memcached.type") == null) {
+ // no explicit one is configured, bail
+ return;
+ }
+ }
+
+ Class extends Module> moduleClass = settings.getAsClass("memcached.type", defaultMemcachedServerTransportModule, "org.elasticsearch.memcached.", "MemcachedServerTransportModule");
+ createModule(moduleClass, settings).configure(binder());
+ }
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerTransport.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerTransport.java
new file mode 100644
index 00000000000..063c0a2677c
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerTransport.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached;
+
+import org.elasticsearch.util.component.LifecycleComponent;
+import org.elasticsearch.util.transport.BoundTransportAddress;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public interface MemcachedServerTransport extends LifecycleComponent {
+
+ BoundTransportAddress boundAddress();
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedTransportException.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedTransportException.java
new file mode 100644
index 00000000000..925630cdb70
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedTransportException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached;
+
+import org.elasticsearch.ElasticSearchException;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class MemcachedTransportException extends ElasticSearchException {
+
+ public MemcachedTransportException(String msg) {
+ super(msg);
+ }
+
+ public MemcachedTransportException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java
new file mode 100644
index 00000000000..c46fd6ffa06
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached.netty;
+
+import org.elasticsearch.memcached.MemcachedRestRequest;
+import org.elasticsearch.rest.RestController;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class MemcachedDispatcher extends SimpleChannelUpstreamHandler {
+
+ private final RestController restController;
+
+ public MemcachedDispatcher(RestController restController) {
+ this.restController = restController;
+ }
+
+ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ MemcachedRestRequest request = (MemcachedRestRequest) e.getMessage();
+ restController.dispatchRequest(request, new MemcachedRestChannel(ctx.getChannel(), request));
+ super.messageReceived(ctx, e);
+ }
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java
new file mode 100644
index 00000000000..a5f80597034
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached.netty;
+
+import org.elasticsearch.memcached.MemcachedRestRequest;
+import org.elasticsearch.memcached.MemcachedTransportException;
+import org.elasticsearch.rest.RestChannel;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.util.Bytes;
+import org.elasticsearch.util.Unicode;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class MemcachedRestChannel implements RestChannel {
+
+ public static final ChannelBuffer CRLF = ChannelBuffers.copiedBuffer("\r\n", "US-ASCII");
+ private static final ChannelBuffer VALUE = ChannelBuffers.copiedBuffer("VALUE ", "US-ASCII");
+ private static final ChannelBuffer EXISTS = ChannelBuffers.copiedBuffer("EXISTS\r\n", "US-ASCII");
+ private static final ChannelBuffer NOT_FOUND = ChannelBuffers.copiedBuffer("NOT_FOUND\r\n", "US-ASCII");
+ private static final ChannelBuffer NOT_STORED = ChannelBuffers.copiedBuffer("NOT_STORED\r\n", "US-ASCII");
+ private static final ChannelBuffer STORED = ChannelBuffers.copiedBuffer("STORED\r\n", "US-ASCII");
+ private static final ChannelBuffer DELETED = ChannelBuffers.copiedBuffer("DELETED\r\n", "US-ASCII");
+ private static final ChannelBuffer END = ChannelBuffers.copiedBuffer("END\r\n", "US-ASCII");
+ private static final ChannelBuffer OK = ChannelBuffers.copiedBuffer("OK\r\n", "US-ASCII");
+ private static final ChannelBuffer ERROR = ChannelBuffers.copiedBuffer("ERROR\r\n", "US-ASCII");
+ private static final ChannelBuffer CLIENT_ERROR = ChannelBuffers.copiedBuffer("CLIENT_ERROR\r\n", "US-ASCII");
+
+ private final Channel channel;
+
+ private final MemcachedRestRequest request;
+
+ public MemcachedRestChannel(Channel channel, MemcachedRestRequest request) {
+ this.channel = channel;
+ this.request = request;
+ }
+
+ @Override public void sendResponse(RestResponse response) {
+ if (response.status().getStatus() >= 500) {
+ channel.write(ERROR.duplicate());
+ } else {
+ if (request.method() == RestRequest.Method.POST) {
+ // TODO this is SET, can we send a payload?
+ channel.write(STORED.duplicate());
+ } else if (request.method() == RestRequest.Method.DELETE) {
+ channel.write(DELETED.duplicate());
+ } else { // GET
+ try {
+ ChannelBuffer writeBuffer = ChannelBuffers.dynamicBuffer(response.contentLength() + 512);
+ writeBuffer.writeBytes(VALUE.duplicate());
+ writeBuffer.writeBytes(Unicode.fromStringAsBytes(request.uri()));
+ writeBuffer.writeByte((byte) ' ');
+ writeBuffer.writeByte((byte) '0');
+ writeBuffer.writeByte((byte) ' ');
+ writeBuffer.writeBytes(Bytes.itoa(response.contentLength()));
+ writeBuffer.writeByte((byte) '\r');
+ writeBuffer.writeByte((byte) '\n');
+ writeBuffer.writeBytes(response.content(), 0, response.contentLength());
+ writeBuffer.writeByte((byte) '\r');
+ writeBuffer.writeByte((byte) '\n');
+ writeBuffer.writeBytes(END.duplicate());
+ channel.write(writeBuffer);
+ } catch (Exception e) {
+ throw new MemcachedTransportException("Failed to write 'get' response", e);
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java
new file mode 100644
index 00000000000..45a14c56692
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached.netty;
+
+import com.google.inject.Inject;
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.http.BindHttpException;
+import org.elasticsearch.memcached.MemcachedServerTransport;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.transport.BindTransportException;
+import org.elasticsearch.util.SizeValue;
+import org.elasticsearch.util.component.AbstractLifecycleComponent;
+import org.elasticsearch.util.settings.Settings;
+import org.elasticsearch.util.transport.BoundTransportAddress;
+import org.elasticsearch.util.transport.InetSocketTransportAddress;
+import org.elasticsearch.util.transport.PortsRange;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.logging.InternalLogger;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Slf4JLoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
+import static org.elasticsearch.util.io.HostResolver.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class NettyMemcachedServerTransport extends AbstractLifecycleComponent implements MemcachedServerTransport {
+
+ static {
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory() {
+ @Override public InternalLogger newInstance(String name) {
+ return super.newInstance(name.replace("org.jboss.netty.", "netty."));
+ }
+ });
+ }
+
+ private final RestController restController;
+
+ private final int workerCount;
+
+ private final String port;
+
+ private final String bindHost;
+
+ private final String publishHost;
+
+ private final Boolean tcpNoDelay;
+
+ private final Boolean tcpKeepAlive;
+
+ private final Boolean reuseAddress;
+
+ private final SizeValue tcpSendBufferSize;
+
+ private final SizeValue tcpReceiveBufferSize;
+
+ private volatile ServerBootstrap serverBootstrap;
+
+ private volatile BoundTransportAddress boundAddress;
+
+ private volatile Channel serverChannel;
+
+ private volatile OpenChannelsHandler serverOpenChannels;
+
+ @Inject public NettyMemcachedServerTransport(Settings settings, RestController restController) {
+ super(settings);
+ this.restController = restController;
+
+ this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
+ this.port = componentSettings.get("port", "11211-11311");
+ this.bindHost = componentSettings.get("bind_host");
+ this.publishHost = componentSettings.get("publish_host");
+ this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", true);
+ this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", null);
+ this.reuseAddress = componentSettings.getAsBoolean("reuse_address", null);
+ this.tcpSendBufferSize = componentSettings.getAsSize("tcp_send_buffer_size", null);
+ this.tcpReceiveBufferSize = componentSettings.getAsSize("tcp_receive_buffer_size", null);
+ }
+
+ @Override public BoundTransportAddress boundAddress() {
+ return boundAddress;
+ }
+
+ @Override protected void doStart() throws ElasticSearchException {
+ this.serverOpenChannels = new OpenChannelsHandler();
+
+ serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedBoss")),
+ Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedIoWorker")),
+ workerCount));
+
+ ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
+ @Override public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("openChannels", serverOpenChannels);
+ pipeline.addLast("decoder", new TextMemcachedDecoder());
+ pipeline.addLast("dispatcher", new MemcachedDispatcher(restController));
+ return pipeline;
+ }
+ };
+
+ serverBootstrap.setPipelineFactory(pipelineFactory);
+
+ if (tcpNoDelay != null) {
+ serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
+ }
+ if (tcpKeepAlive != null) {
+ serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
+ }
+ if (tcpSendBufferSize != null) {
+ serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes());
+ }
+ if (tcpReceiveBufferSize != null) {
+ serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
+ }
+ if (reuseAddress != null) {
+ serverBootstrap.setOption("reuseAddress", reuseAddress);
+ serverBootstrap.setOption("child.reuseAddress", reuseAddress);
+ }
+
+ // Bind and start to accept incoming connections.
+ InetAddress hostAddressX;
+ try {
+ hostAddressX = resolveBindHostAddress(bindHost, settings);
+ } catch (IOException e) {
+ throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e);
+ }
+ final InetAddress hostAddress = hostAddressX;
+
+ PortsRange portsRange = new PortsRange(port);
+ final AtomicReference lastException = new AtomicReference();
+ boolean success = portsRange.iterate(new PortsRange.PortCallback() {
+ @Override public boolean onPortNumber(int portNumber) {
+ try {
+ serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
+ } catch (Exception e) {
+ lastException.set(e);
+ return false;
+ }
+ return true;
+ }
+ });
+ if (!success) {
+ throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get());
+ }
+
+ InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
+ InetSocketAddress publishAddress;
+ try {
+ InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
+ if (publishAddressX == null) {
+ // if its 0.0.0.0, we can't publish that.., default to the local ip address
+ if (boundAddress.getAddress().isAnyLocalAddress()) {
+ publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
+ } else {
+ publishAddress = boundAddress;
+ }
+ } else {
+ publishAddress = new InetSocketAddress(publishAddressX, boundAddress.getPort());
+ }
+ } catch (Exception e) {
+ throw new BindTransportException("Failed to resolve publish address", e);
+ }
+ this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
+ }
+
+ @Override protected void doStop() throws ElasticSearchException {
+ if (serverChannel != null) {
+ serverChannel.close().awaitUninterruptibly();
+ serverChannel = null;
+ }
+
+ if (serverOpenChannels != null) {
+ serverOpenChannels.close();
+ serverOpenChannels = null;
+ }
+
+ if (serverBootstrap != null) {
+ serverBootstrap.releaseExternalResources();
+ serverBootstrap = null;
+ }
+ }
+
+ @Override protected void doClose() throws ElasticSearchException {
+ }
+}
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransportModule.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransportModule.java
new file mode 100644
index 00000000000..2c210ce8b6f
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransportModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached.netty;
+
+import com.google.inject.AbstractModule;
+import org.elasticsearch.memcached.MemcachedServerTransport;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class NettyMemcachedServerTransportModule extends AbstractModule {
+
+ @Override protected void configure() {
+ bind(MemcachedServerTransport.class).to(NettyMemcachedServerTransport.class).asEagerSingleton();
+ }
+}
\ No newline at end of file
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java
new file mode 100644
index 00000000000..45793d5071a
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached.netty;
+
+import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet;
+import org.jboss.netty.channel.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+@ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
+public class OpenChannelsHandler implements ChannelUpstreamHandler {
+
+ private NonBlockingHashSet openChannels = new NonBlockingHashSet();
+
+ private final ChannelFutureListener remover = new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) throws Exception {
+ openChannels.remove(future.getChannel());
+ }
+ };
+
+ @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
+ if (e instanceof ChannelStateEvent) {
+ ChannelStateEvent evt = (ChannelStateEvent) e;
+ if (evt.getState() == ChannelState.OPEN) {
+ boolean added = openChannels.add(ctx.getChannel());
+ if (added) {
+ ctx.getChannel().getCloseFuture().addListener(remover);
+ }
+ }
+ }
+ ctx.sendUpstream(e);
+ }
+
+ public void close() {
+ for (Channel channel : openChannels) {
+ channel.close().awaitUninterruptibly();
+ }
+ }
+}
\ No newline at end of file
diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/TextMemcachedDecoder.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/TextMemcachedDecoder.java
new file mode 100644
index 00000000000..8993c298a44
--- /dev/null
+++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/TextMemcachedDecoder.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached.netty;
+
+import org.elasticsearch.memcached.MemcachedRestRequest;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.util.Unicode;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+import java.io.StreamCorruptedException;
+import java.util.regex.Pattern;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class TextMemcachedDecoder extends FrameDecoder {
+
+ private final Pattern lineSplit = Pattern.compile(" +");
+
+ public static final byte CR = 13;
+ public static final byte LF = 10;
+ public static final byte[] CRLF = new byte[]{CR, LF};
+
+ public static enum Reply {
+ STORED, NOT_STORED, EXISTS, NOT_FOUND, DELETED, STAT, VALUE, END, OK, VERSION,
+ ERROR, CLIENT_ERROR, SERVER_ERROR;
+
+ private final byte[] bytes;
+
+ Reply() {
+ this.bytes = this.toString().getBytes();
+ }
+
+ public byte[] bytes() {
+ return bytes;
+ }
+ }
+
+ private volatile StringBuffer sb = new StringBuffer();
+
+ private volatile MemcachedRestRequest request;
+
+ public TextMemcachedDecoder() {
+ super(false);
+ }
+
+ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+ MemcachedRestRequest request = this.request;
+ if (request == null) {
+ buffer.markReaderIndex();
+ // need to read a header
+ boolean done = false;
+ StringBuffer sb = this.sb;
+ int readableBytes = buffer.readableBytes();
+ for (int i = 0; i < readableBytes; i++) {
+ byte next = buffer.readByte();
+ if (next == CR) {
+ next = buffer.readByte();
+ if (next == LF) {
+ done = true;
+ break;
+ }
+ } else if (next == LF) {
+ done = true;
+ break;
+ } else {
+ sb.append((char) next);
+ }
+ }
+ if (!done) {
+ buffer.resetReaderIndex();
+ return null;
+ }
+
+ String[] args = lineSplit.split(sb);
+ // we read the text, clear it
+ sb.setLength(0);
+
+ String cmd = args[0];
+ String uri = args[1];
+ if ("get".equals(cmd)) {
+ request = new MemcachedRestRequest(RestRequest.Method.GET, uri, -1, false);
+ if (args.length > 3) {
+ request.setData(Unicode.fromStringAsBytes(args[2]));
+ }
+ return request;
+ } else if ("delete".equals(cmd)) {
+ request = new MemcachedRestRequest(RestRequest.Method.DELETE, uri, -1, false);
+// if (args.length > 3) {
+// request.setData(Unicode.fromStringAsBytes(args[2]));
+// }
+ return request;
+ } else if ("set".equals(cmd)) {
+ this.request = new MemcachedRestRequest(RestRequest.Method.POST, uri, Integer.parseInt(args[4]), false);
+ buffer.markReaderIndex();
+ } else if ("quit".equals(cmd)) {
+ channel.disconnect();
+ }
+ } else {
+ if (buffer.readableBytes() < (request.getDataSize() + 2)) {
+ return null;
+ }
+ byte[] data = new byte[request.getDataSize()];
+ buffer.readBytes(data, 0, data.length);
+ byte next = buffer.readByte();
+ if (next == CR) {
+ next = buffer.readByte();
+ if (next == LF) {
+ request.setData(data);
+ // reset
+ this.request = null;
+ return request;
+ } else {
+ this.request = null;
+ throw new StreamCorruptedException("Expecting \r\n after data block");
+ }
+ } else {
+ this.request = null;
+ throw new StreamCorruptedException("Expecting \r\n after data block");
+ }
+ }
+ return null;
+ }
+
+ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ this.request = null;
+ sb.setLength(0);
+ e.getCause().printStackTrace();
+ }
+}
diff --git a/plugins/memcached/src/test/java/org/elasticsearch/memcached/test/SimpleMemcachedActionsTests.java b/plugins/memcached/src/test/java/org/elasticsearch/memcached/test/SimpleMemcachedActionsTests.java
new file mode 100644
index 00000000000..6379deef245
--- /dev/null
+++ b/plugins/memcached/src/test/java/org/elasticsearch/memcached/test/SimpleMemcachedActionsTests.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.memcached.test;
+
+import net.spy.memcached.MemcachedClient;
+import org.elasticsearch.node.Node;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.node.NodeBuilder.*;
+import static org.elasticsearch.util.json.JsonBuilder.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+@Test
+public class SimpleMemcachedActionsTests {
+
+ private Node node;
+
+ private MemcachedClient memcachedClient;
+
+ @BeforeMethod
+ public void setup() throws IOException {
+ node = nodeBuilder().node();
+
+// NodesInfoResponse nodesInfo = node.client().admin().cluster().nodesInfo(nodesInfo()).actionGet();
+ memcachedClient = new MemcachedClient(new InetSocketAddress("localhost", 11211));
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ memcachedClient.shutdown();
+ node.close();
+ }
+
+ @Test public void testSimpleOperations() throws Exception {
+ Future setResult = memcachedClient.set("/test/person/1", 0, binaryJsonBuilder().startObject().field("test", "value").endObject().copiedBytes());
+ setResult.get(10, TimeUnit.SECONDS);
+
+ String getResult = (String) memcachedClient.get("/_refresh");
+ System.out.println("REFRESH " + getResult);
+
+ getResult = (String) memcachedClient.get("/test/person/1");
+ System.out.println("GET " + getResult);
+
+ Future deleteResult = memcachedClient.delete("/test/person/1");
+ deleteResult.get(10, TimeUnit.SECONDS);
+
+ getResult = (String) memcachedClient.get("/_refresh");
+ System.out.println("REFRESH " + getResult);
+
+ getResult = (String) memcachedClient.get("/test/person/1");
+ System.out.println("GET " + getResult);
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index c7fbb21f713..32376c70e9f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -8,6 +8,7 @@ include 'benchmark-micro'
include 'plugins-attachments'
include 'plugins-groovy'
+include 'plugins-memcached'
rootProject.name = 'elasticsearch-root'
rootProject.children.each {project ->