From 51aac0cdf7d1a2c76beb0235ec4535030aadaa23 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 16 Apr 2010 22:01:16 +0300 Subject: [PATCH] memcached initial work --- .idea/dictionaries/kimchy.xml | 2 + .idea/libraries/spymemcached.xml | 9 + .idea/modules.xml | 1 + .idea/modules/elasticsearch-root.iml | 1 + .idea/modules/plugin-memcached.iml | 21 ++ .../discovery/jgroups/JgroupsDiscovery.java | 2 +- .../http/netty/NettyHttpRequest.java | 4 +- .../http/netty/NettyHttpServerTransport.java | 6 +- .../http/netty/OpenChannelsHandler.java | 2 +- .../org/elasticsearch/jmx/JmxService.java | 2 +- .../clear/RestClearIndicesCacheAction.java | 3 + .../indices/create/RestCreateIndexAction.java | 1 + .../admin/indices/flush/RestFlushAction.java | 3 + .../mapping/put/RestPutMappingAction.java | 3 + .../indices/optimize/RestOptimizeAction.java | 3 + .../indices/refresh/RestRefreshAction.java | 3 + .../rest/action/index/RestIndexAction.java | 2 + .../transport/netty/NettyTransport.java | 6 +- .../java/org/elasticsearch/util/Bytes.java | 261 ++++++++++++++++++ .../elasticsearch/util/io/HostResolver.java | 12 +- .../groovy/client/GAdminClient.groovy | 19 ++ .../groovy/client/GClusterAdminClient.groovy | 19 ++ .../groovy/client/GIndicesAdminClient.groovy | 19 ++ .../elasticsearch/groovy/node/GNode.groovy | 27 +- .../groovy/node/GNodeBuilder.groovy | 27 +- .../groovy/util/json/JsonBuilder.groovy | 19 ++ .../client/DifferentApiExecutionTests.groovy | 23 +- .../test/client/SimpleActionsTests.groovy | 23 +- .../groovy/test/node/GNodeBuilderTests.groovy | 23 +- .../groovy/util/json/JsonBuilderTests.groovy | 19 ++ plugins/memcached/build.gradle | 140 ++++++++++ .../src/main/java/es-plugin.properties | 1 + .../memcached/MemcachedPlugin.java | 54 ++++ .../memcached/MemcachedRestRequest.java | 132 +++++++++ .../memcached/MemcachedServer.java | 64 +++++ .../memcached/MemcachedServerModule.java | 58 ++++ .../memcached/MemcachedServerTransport.java | 31 +++ .../MemcachedTransportException.java | 36 +++ .../memcached/netty/MemcachedDispatcher.java | 44 +++ .../memcached/netty/MemcachedRestChannel.java | 90 ++++++ .../netty/NettyMemcachedServerTransport.java | 215 +++++++++++++++ .../NettyMemcachedServerTransportModule.java | 33 +++ .../memcached/netty/OpenChannelsHandler.java | 57 ++++ .../memcached/netty/TextMemcachedDecoder.java | 151 ++++++++++ .../test/SimpleMemcachedActionsTests.java | 79 ++++++ settings.gradle | 1 + 46 files changed, 1721 insertions(+), 30 deletions(-) create mode 100644 .idea/libraries/spymemcached.xml create mode 100644 .idea/modules/plugin-memcached.iml create mode 100644 plugins/memcached/build.gradle create mode 100644 plugins/memcached/src/main/java/es-plugin.properties create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedPlugin.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServer.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerModule.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerTransport.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedTransportException.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransportModule.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java create mode 100644 plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/TextMemcachedDecoder.java create mode 100644 plugins/memcached/src/test/java/org/elasticsearch/memcached/test/SimpleMemcachedActionsTests.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 203fcd182f1..9fb4854b0bc 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -34,10 +34,12 @@ lifecycle linefeeds lucene + memcached metadata millis mmap multi + multiline nanos newcount ngram diff --git a/.idea/libraries/spymemcached.xml b/.idea/libraries/spymemcached.xml new file mode 100644 index 00000000000..6655e882415 --- /dev/null +++ b/.idea/libraries/spymemcached.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml index 7ca869a1a29..29f9e415dd4 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -7,6 +7,7 @@ + diff --git a/.idea/modules/elasticsearch-root.iml b/.idea/modules/elasticsearch-root.iml index 0e40769cd9e..bb7f1a17de8 100644 --- a/.idea/modules/elasticsearch-root.iml +++ b/.idea/modules/elasticsearch-root.iml @@ -13,6 +13,7 @@ + diff --git a/.idea/modules/plugin-memcached.iml b/.idea/modules/plugin-memcached.iml new file mode 100644 index 00000000000..373ea015c24 --- /dev/null +++ b/.idea/modules/plugin-memcached.iml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java index a4edcb21c4a..1aced805385 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java @@ -109,7 +109,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent impl if (System.getProperty("jgroups.bind_addr") == null) { // automatically set the bind address based on ElasticSearch default bindings... try { - InetAddress bindAddress = HostResolver.resultBindHostAddress(null, settings, HostResolver.LOCAL_IP); + InetAddress bindAddress = HostResolver.resolveBindHostAddress(null, settings, HostResolver.LOCAL_IP); if ((bindAddress instanceof Inet4Address && HostResolver.isIPv4()) || (bindAddress instanceof Inet6Address && !HostResolver.isIPv4())) { sysPropsSet.put("jgroups.bind_addr", bindAddress.getHostAddress()); System.setProperty("jgroups.bind_addr", bindAddress.getHostAddress()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java index 53d79677074..bac47528e53 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java @@ -39,9 +39,9 @@ public class NettyHttpRequest extends AbstractRestRequest implements HttpRequest private final org.jboss.netty.handler.codec.http.HttpRequest request; - private Map params; + private final Map params; - private String path; + private final String path; public NettyHttpRequest(org.jboss.netty.handler.codec.http.HttpRequest request) { this.request = request; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 803310b2599..905c59eaf33 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -163,7 +163,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent implem // Bind and start to accept incoming connections. InetAddress hostAddressX; try { - hostAddressX = resultBindHostAddress(bindHost, settings); + hostAddressX = resolveBindHostAddress(bindHost, settings); } catch (IOException e) { throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e); } @@ -261,11 +261,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress(); InetSocketAddress publishAddress; try { - InetAddress publishAddressX = resultPublishHostAddress(publishHost, settings); + InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings); if (publishAddressX == null) { // if its 0.0.0.0, we can't publish that.., default to the local ip address if (boundAddress.getAddress().isAnyLocalAddress()) { - publishAddress = new InetSocketAddress(resultPublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort()); + publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort()); } else { publishAddress = boundAddress; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java index d1bf24f7f39..d5e37227c9e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/Bytes.java @@ -25,4 +25,265 @@ package org.elasticsearch.util; public class Bytes { public static final byte[] EMPTY_ARRAY = new byte[0]; + + + final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999, + 99999999, 999999999, Integer.MAX_VALUE}; + private static final byte[] LONG_MIN_VALUE_BYTES = "-9223372036854775808".getBytes(); + + // Requires positive x + + static int stringSize(int x) { + for (int i = 0; ; i++) + if (x <= sizeTable[i]) + return i + 1; + } + + /** + * Blatant copy of Integer.toString, but returning a byte array instead of a String, as + * string charset decoding/encoding was killing us on performance. + * + * @param i integer to convert + * @return byte[] array containing literal ASCII char representation + */ + public static byte[] itoa(int i) { + int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i); + byte[] buf = new byte[size]; + getChars(i, size, buf); + return buf; + } + + final static byte[] digits = { + '0', '1', '2', '3', '4', '5', + '6', '7', '8', '9', 'a', 'b', + 'c', 'd', 'e', 'f', 'g', 'h', + 'i', 'j', 'k', 'l', 'm', 'n', + 'o', 'p', 'q', 'r', 's', 't', + 'u', 'v', 'w', 'x', 'y', 'z' + }; + + final static byte[] DigitTens = { + '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', + '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', + '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', + '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', + '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', + '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', + '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', + '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', + '8', '8', '8', '8', '8', '8', '8', '8', '8', '8', + '9', '9', '9', '9', '9', '9', '9', '9', '9', '9', + }; + + final static byte[] DigitOnes = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + }; + + static void getChars(int i, int index, byte[] buf) { + int q, r; + int charPos = index; + byte sign = 0; + + if (i < 0) { + sign = '-'; + i = -i; + } + + // Generate two digits per iteration + while (i >= 65536) { + q = i / 100; + // really: r = i - (q * 100); + r = i - ((q << 6) + (q << 5) + (q << 2)); + i = q; + buf[--charPos] = DigitOnes[r]; + buf[--charPos] = DigitTens[r]; + } + + // Fall thru to fast mode for smaller numbers + // assert(i <= 65536, i); + for (; ;) { + q = (i * 52429) >>> (16 + 3); + r = i - ((q << 3) + (q << 1)); // r = i-(q*10) ... + buf[--charPos] = digits[r]; + i = q; + if (i == 0) break; + } + if (sign != 0) { + buf[--charPos] = sign; + } + } + + public static int atoi(byte[] s) + throws NumberFormatException { + int result = 0; + boolean negative = false; + int i = 0, len = s.length; + int limit = -Integer.MAX_VALUE; + int multmin; + int digit; + + if (len > 0) { + byte firstChar = s[0]; + if (firstChar < '0') { // Possible leading "-" + if (firstChar == '-') { + negative = true; + limit = Integer.MIN_VALUE; + } else + throw new NumberFormatException(); + + if (len == 1) // Cannot have lone "-" + throw new NumberFormatException(); + i++; + } + multmin = limit / 10; + while (i < len) { + // Accumulating negatively avoids surprises near MAX_VALUE + digit = Character.digit(s[i++], 10); + if (digit < 0) { + throw new NumberFormatException(); + } + if (result < multmin) { + throw new NumberFormatException(); + } + result *= 10; + if (result < limit + digit) { + throw new NumberFormatException(); + } + result -= digit; + } + } else { + throw new NumberFormatException(); + } + return negative ? result : -result; + } + + public static byte[] ltoa(long i) { + if (i == Long.MIN_VALUE) + return LONG_MIN_VALUE_BYTES; + int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i); + byte[] buf = new byte[size]; + getChars(i, size, buf); + return buf; + } + + /** + * Places characters representing the integer i into the + * character array buf. The characters are placed into + * the buffer backwards starting with the least significant + * digit at the specified index (exclusive), and working + * backwards from there. + * + * Will fail if i == Long.MIN_VALUE + */ + static void getChars(long i, int index, byte[] buf) { + long q; + int r; + int charPos = index; + byte sign = 0; + + if (i < 0) { + sign = '-'; + i = -i; + } + + // Get 2 digits/iteration using longs until quotient fits into an int + while (i > Integer.MAX_VALUE) { + q = i / 100; + // really: r = i - (q * 100); + r = (int) (i - ((q << 6) + (q << 5) + (q << 2))); + i = q; + buf[--charPos] = DigitOnes[r]; + buf[--charPos] = DigitTens[r]; + } + + // Get 2 digits/iteration using ints + int q2; + int i2 = (int) i; + while (i2 >= 65536) { + q2 = i2 / 100; + // really: r = i2 - (q * 100); + r = i2 - ((q2 << 6) + (q2 << 5) + (q2 << 2)); + i2 = q2; + buf[--charPos] = DigitOnes[r]; + buf[--charPos] = DigitTens[r]; + } + + // Fall thru to fast mode for smaller numbers + // assert(i2 <= 65536, i2); + for (; ;) { + q2 = (i2 * 52429) >>> (16 + 3); + r = i2 - ((q2 << 3) + (q2 << 1)); // r = i2-(q2*10) ... + buf[--charPos] = digits[r]; + i2 = q2; + if (i2 == 0) break; + } + if (sign != 0) { + buf[--charPos] = sign; + } + } + + // Requires positive x + + static int stringSize(long x) { + long p = 10; + for (int i = 1; i < 19; i++) { + if (x < p) + return i; + p = 10 * p; + } + return 19; + } + + public static long atol(byte[] s) + throws NumberFormatException { + long result = 0; + boolean negative = false; + int i = 0, len = s.length; + long limit = -Long.MAX_VALUE; + long multmin; + int digit; + + if (len > 0) { + byte firstChar = s[0]; + if (firstChar < '0') { // Possible leading "-" + if (firstChar == '-') { + negative = true; + limit = Long.MIN_VALUE; + } else + throw new NumberFormatException(); + + if (len == 1) // Cannot have lone "-" + throw new NumberFormatException(); + i++; + } + multmin = limit / 10; + while (i < len) { + // Accumulating negatively avoids surprises near MAX_VALUE + digit = Character.digit(s[i++], 10); + if (digit < 0) { + throw new NumberFormatException(); + } + if (result < multmin) { + throw new NumberFormatException(); + } + result *= 10; + if (result < limit + digit) { + throw new NumberFormatException(); + } + result -= digit; + } + } else { + throw new NumberFormatException(); + } + return negative ? result : -result; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java index f725fa10691..0a536ac4fb4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/HostResolver.java @@ -44,19 +44,19 @@ public abstract class HostResolver { return System.getProperty("java.net.preferIPv4Stack") != null && System.getProperty("java.net.preferIPv4Stack").equals("true"); } - public static InetAddress resultBindHostAddress(String bindHost, Settings settings) throws IOException { - return resultBindHostAddress(bindHost, settings, null); + public static InetAddress resolveBindHostAddress(String bindHost, Settings settings) throws IOException { + return resolveBindHostAddress(bindHost, settings, null); } - public static InetAddress resultBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException { + public static InetAddress resolveBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException { return resolveInetAddress(bindHost, settings.get(GLOBAL_NETWORK_BINDHOST_SETTING), defaultValue2); } - public static InetAddress resultPublishHostAddress(String publishHost, Settings settings) throws IOException { - return resultPublishHostAddress(publishHost, settings, null); + public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings) throws IOException { + return resolvePublishHostAddress(publishHost, settings, null); } - public static InetAddress resultPublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException { + public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException { return resolveInetAddress(publishHost, settings.get(GLOBAL_NETWORK_PUBLISHHOST_SETTING), defaultValue2); } diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy index e05521ba03f..6bd02abd734 100644 --- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy +++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GAdminClient.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.client /** diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy index 3fd03502767..bdcd4edb533 100644 --- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy +++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.client import org.elasticsearch.action.ActionListener diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy index 24f41613557..61c207005d0 100644 --- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy +++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/client/GIndicesAdminClient.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.client import org.elasticsearch.action.ActionListener diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy index 35d59f5428d..57190f51f24 100644 --- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy +++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNode.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.node import org.elasticsearch.groovy.client.GClient @@ -27,7 +46,7 @@ class GNode { /** * Start the node. If the node is already started, this method is no-op. */ - def getStart() { + def start() { node.start() this } @@ -35,15 +54,15 @@ class GNode { /** * Stops the node. If the node is already started, this method is no-op. */ - def getStop() { + def stop() { node.stop() this } /** - * Closes the node (and {@link #stop} s if its running). + * Closes the node (and {@link #stop} s if its running). */ - def getClose() { + def close() { node.close() this } diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy index a363623631b..63a8586b25d 100644 --- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy +++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/node/GNodeBuilder.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.node import org.elasticsearch.groovy.util.json.JsonBuilder @@ -7,6 +26,8 @@ import org.elasticsearch.util.settings.ImmutableSettings import org.elasticsearch.util.settings.loader.JsonSettingsLoader /** + * The node builder allow to build a {@link GNode} instance. + * * @author kimchy (shay.banon) */ public class GNodeBuilder { @@ -24,12 +45,12 @@ public class GNodeBuilder { settingsBuilder.put(new JsonSettingsLoader().load(settingsBytes)) } - def getBuild() { + def build() { Node node = new InternalNode(settingsBuilder.build(), loadConfigSettings) new GNode(node) } - def getNode() { - build.start + def node() { + build().start() } } diff --git a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy index 1d31817cfbd..4fcd6d5b25e 100644 --- a/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy +++ b/plugins/groovy/src/main/groovy/org/elasticsearch/groovy/util/json/JsonBuilder.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.util.json import org.elasticsearch.ElasticSearchGenerationException diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy index 70b4c9a9def..eeb41e0a411 100644 --- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy +++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/DifferentApiExecutionTests.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.test.client import java.util.concurrent.CountDownLatch @@ -28,12 +47,12 @@ class DifferentApiExecutionTests { } } - node = nodeBuilder.node + node = nodeBuilder.node() } @AfterMethod protected void tearDown() { - node.close + node.close() } @Test diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy index 8554f2d93f1..b1fc5bea9f9 100644 --- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy +++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/client/SimpleActionsTests.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.test.client import org.elasticsearch.groovy.node.GNode @@ -24,12 +43,12 @@ class SimpleActionsTests { } } - node = nodeBuilder.node + node = nodeBuilder.node() } @AfterMethod protected void tearDown() { - node.close + node.close() } diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy index 10725f84a58..6dc4b4eb023 100644 --- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy +++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/test/node/GNodeBuilderTests.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.test.node import org.elasticsearch.groovy.node.GNode @@ -19,7 +38,7 @@ class GNodeBuilderTests extends GroovyTestCase { name = "test" } } - GNode node = nodeBuilder.node - node.stop.close + GNode node = nodeBuilder.node() + node.stop().close() } } diff --git a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy index ed956526834..5a51b0db818 100644 --- a/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy +++ b/plugins/groovy/src/test/groovy/org/elasticsearch/groovy/util/json/JsonBuilderTests.groovy @@ -1,3 +1,22 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.groovy.util.json /** diff --git a/plugins/memcached/build.gradle b/plugins/memcached/build.gradle new file mode 100644 index 00000000000..55b7b40e8dc --- /dev/null +++ b/plugins/memcached/build.gradle @@ -0,0 +1,140 @@ +dependsOn(':elasticsearch') + +apply plugin: 'java' +apply plugin: 'maven' + +archivesBaseName = "elasticsearch-memcached" + +explodedDistDir = new File(distsDir, 'exploded') + +manifest.mainAttributes("Implementation-Title": "ElasticSearch::Plugins::Memcached", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr) + +configurations.compile.transitive = true +configurations.testCompile.transitive = true + +// no need to use the resource dir +sourceSets.main.resources.srcDir 'src/main/java' +sourceSets.test.resources.srcDir 'src/test/java' + +// add the source files to the dist jar +jar { + from sourceSets.main.allJava +} + +configurations { + dists + distLib { + visible = false + transitive = false + } +} + +repositories { + mavenRepo urls: "http://bleu.west.spy.net/~dustin/m2repo/" +} + +dependencies { + compile project(':elasticsearch') + + testCompile project(':test-testng') + testCompile('org.testng:testng:5.10:jdk15') { transitive = false } + testCompile 'spy:memcached:2.4.2' +} + +test { + useTestNG() + jmvArgs = ["-ea", "-Xmx1024m"] + suiteName = project.name + listeners = ["org.elasticsearch.util.testng.Listeners"] + systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties") +} + +task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << { + [explodedDistDir]*.mkdirs() + + copy { + from configurations.distLib + into explodedDistDir + } + + // remove elasticsearch files (compile above adds the elasticsearch one) + ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") } + + copy { + from libsDir + into explodedDistDir + } + + ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") } + ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") } +} + +task zip(type: Zip, dependsOn: ['explodedDist']) { + from(explodedDistDir) { + } +} + +task release(dependsOn: [zip]) << { + ant.delete(dir: explodedDistDir) + copy { + from distsDir + into(new File(rootProject.distsDir, "plugins")) + } +} + +configurations { + deployerJars +} + +dependencies { + deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2" +} + +task sourcesJar(type: Jar, dependsOn: classes) { + classifier = 'sources' + from sourceSets.main.allSource +} + +task javadocJar(type: Jar, dependsOn: javadoc) { + classifier = 'javadoc' + from javadoc.destinationDir +} + +artifacts { + archives sourcesJar + archives javadocJar +} + +uploadArchives { + repositories.mavenDeployer { + configuration = configurations.deployerJars + repository(url: rootProject.mavenRepoUrl) { + authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass) + } + snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) { + authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass) + } + + pom.project { + inceptionYear '2009' + name 'elasticsearch-plugins-memcached' + description 'Memcacehd Plugin for ElasticSearch' + licenses { + license { + name 'The Apache Software License, Version 2.0' + url 'http://www.apache.org/licenses/LICENSE-2.0.txt' + distribution 'repo' + } + } + scm { + connection 'git://github.com/elasticsearch/elasticsearch.git' + developerConnection 'git@github.com:elasticsearch/elasticsearch.git' + url 'http://github.com/elasticsearch/elasticsearch' + } + } + + pom.whenConfigured {pom -> + pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones + } + } +} \ No newline at end of file diff --git a/plugins/memcached/src/main/java/es-plugin.properties b/plugins/memcached/src/main/java/es-plugin.properties new file mode 100644 index 00000000000..42ab805aa06 --- /dev/null +++ b/plugins/memcached/src/main/java/es-plugin.properties @@ -0,0 +1 @@ +plugin=org.elasticsearch.memcached.MemcachedPlugin diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedPlugin.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedPlugin.java new file mode 100644 index 00000000000..30080f0def6 --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedPlugin.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached; + +import com.google.inject.Module; +import org.elasticsearch.plugins.AbstractPlugin; +import org.elasticsearch.util.component.LifecycleComponent; + +import java.util.Collection; + +import static com.google.common.collect.Lists.*; + +/** + * @author kimchy (shay.banon) + */ +public class MemcachedPlugin extends AbstractPlugin { + + @Override public String name() { + return "memcached"; + } + + @Override public String description() { + return "Exports elasticsearch APIs over memcached"; + } + + @Override public Collection> modules() { + Collection> modules = newArrayList(); + modules.add(MemcachedServerModule.class); + return modules; + } + + @Override public Collection> services() { + Collection> services = newArrayList(); + services.add(MemcachedServer.class); + return services; + } +} \ No newline at end of file diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java new file mode 100644 index 00000000000..f2fea87bfdf --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedRestRequest.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.elasticsearch.rest.support.AbstractRestRequest; +import org.elasticsearch.rest.support.RestUtils; +import org.elasticsearch.util.Unicode; +import org.elasticsearch.util.io.FastByteArrayInputStream; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author kimchy (shay.banon) + */ +public class MemcachedRestRequest extends AbstractRestRequest { + + private final Method method; + + private final String uri; + + private final int dataSize; + + private boolean binary; + + private final Map params; + + private final String path; + + private byte[] data; + + public MemcachedRestRequest(Method method, String uri, int dataSize, boolean binary) { + this.method = method; + this.uri = uri; + this.dataSize = dataSize; + this.binary = binary; + this.params = new HashMap(); + int pathEndPos = uri.indexOf('?'); + if (pathEndPos < 0) { + this.path = uri; + } else { + this.path = uri.substring(0, pathEndPos); + RestUtils.decodeQueryString(uri, pathEndPos + 1, params); + } + } + + @Override public Method method() { + return this.method; + } + + @Override public String uri() { + return this.uri; + } + + @Override public String path() { + return this.path; + } + + public int getDataSize() { + return dataSize; + } + + public void setData(byte[] data) { + this.data = data; + } + + @Override public boolean hasContent() { + return data != null; + } + + @Override public InputStream contentAsStream() { + return new FastByteArrayInputStream(data); + } + + @Override public byte[] contentAsBytes() { + return data; + } + + @Override public String contentAsString() { + return Unicode.fromBytes(data); + } + + @Override public Set headerNames() { + return ImmutableSet.of(); + } + + @Override public String header(String name) { + return null; + } + + @Override public List headers(String name) { + return ImmutableList.of(); + } + + @Override public String cookie() { + return null; + } + + @Override public boolean hasParam(String key) { + return params.containsKey(key); + } + + @Override public String param(String key) { + return params.get(key); + } + + @Override public Map params() { + return params; + } +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServer.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServer.java new file mode 100644 index 00000000000..57ed95318cc --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServer.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached; + +import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.settings.Settings; + +/** + * @author kimchy (shay.banon) + */ +public class MemcachedServer extends AbstractLifecycleComponent { + + private final MemcachedServerTransport transport; + + private final TransportNodesInfo nodesInfo; + + private final RestController restController; + + @Inject public MemcachedServer(Settings settings, MemcachedServerTransport transport, + RestController restController, TransportNodesInfo nodesInfo) { + super(settings); + this.transport = transport; + this.restController = restController; + this.nodesInfo = nodesInfo; + } + + @Override protected void doStart() throws ElasticSearchException { + transport.start(); + if (logger.isInfoEnabled()) { + logger.info("{}", transport.boundAddress()); + } + nodesInfo.putNodeAttribute("memcached_address", transport.boundAddress().publishAddress().toString()); + } + + @Override protected void doStop() throws ElasticSearchException { + nodesInfo.removeNodeAttribute("memcached_address"); + transport.stop(); + } + + @Override protected void doClose() throws ElasticSearchException { + transport.close(); + } +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerModule.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerModule.java new file mode 100644 index 00000000000..9c4c6c6df45 --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerModule.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached; + +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import org.elasticsearch.util.Classes; +import org.elasticsearch.util.settings.Settings; + +import static org.elasticsearch.util.guice.ModulesFactory.*; + +/** + * @author kimchy (shay.banon) + */ +public class MemcachedServerModule extends AbstractModule { + + private final Settings settings; + + public MemcachedServerModule(Settings settings) { + this.settings = settings; + } + + @SuppressWarnings({"unchecked"}) @Override protected void configure() { + bind(MemcachedServer.class).asEagerSingleton(); + + Class defaultMemcachedServerTransportModule = null; + try { + Classes.getDefaultClassLoader().loadClass("org.elasticsearch.memcached.netty.NettyMemcachedServerTransport"); + defaultMemcachedServerTransportModule = (Class) Classes.getDefaultClassLoader().loadClass("org.elasticsearch.memcached.netty.NettyMemcachedServerTransportModule"); + } catch (ClassNotFoundException e) { + // no netty one, ok... + if (settings.get("memcached.type") == null) { + // no explicit one is configured, bail + return; + } + } + + Class moduleClass = settings.getAsClass("memcached.type", defaultMemcachedServerTransportModule, "org.elasticsearch.memcached.", "MemcachedServerTransportModule"); + createModule(moduleClass, settings).configure(binder()); + } +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerTransport.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerTransport.java new file mode 100644 index 00000000000..063c0a2677c --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedServerTransport.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached; + +import org.elasticsearch.util.component.LifecycleComponent; +import org.elasticsearch.util.transport.BoundTransportAddress; + +/** + * @author kimchy (shay.banon) + */ +public interface MemcachedServerTransport extends LifecycleComponent { + + BoundTransportAddress boundAddress(); +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedTransportException.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedTransportException.java new file mode 100644 index 00000000000..925630cdb70 --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/MemcachedTransportException.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached; + +import org.elasticsearch.ElasticSearchException; + +/** + * @author kimchy (shay.banon) + */ +public class MemcachedTransportException extends ElasticSearchException { + + public MemcachedTransportException(String msg) { + super(msg); + } + + public MemcachedTransportException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java new file mode 100644 index 00000000000..c46fd6ffa06 --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedDispatcher.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.netty; + +import org.elasticsearch.memcached.MemcachedRestRequest; +import org.elasticsearch.rest.RestController; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +/** + * @author kimchy (shay.banon) + */ +public class MemcachedDispatcher extends SimpleChannelUpstreamHandler { + + private final RestController restController; + + public MemcachedDispatcher(RestController restController) { + this.restController = restController; + } + + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + MemcachedRestRequest request = (MemcachedRestRequest) e.getMessage(); + restController.dispatchRequest(request, new MemcachedRestChannel(ctx.getChannel(), request)); + super.messageReceived(ctx, e); + } +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java new file mode 100644 index 00000000000..a5f80597034 --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/MemcachedRestChannel.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.netty; + +import org.elasticsearch.memcached.MemcachedRestRequest; +import org.elasticsearch.memcached.MemcachedTransportException; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.util.Bytes; +import org.elasticsearch.util.Unicode; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; + +/** + * @author kimchy (shay.banon) + */ +public class MemcachedRestChannel implements RestChannel { + + public static final ChannelBuffer CRLF = ChannelBuffers.copiedBuffer("\r\n", "US-ASCII"); + private static final ChannelBuffer VALUE = ChannelBuffers.copiedBuffer("VALUE ", "US-ASCII"); + private static final ChannelBuffer EXISTS = ChannelBuffers.copiedBuffer("EXISTS\r\n", "US-ASCII"); + private static final ChannelBuffer NOT_FOUND = ChannelBuffers.copiedBuffer("NOT_FOUND\r\n", "US-ASCII"); + private static final ChannelBuffer NOT_STORED = ChannelBuffers.copiedBuffer("NOT_STORED\r\n", "US-ASCII"); + private static final ChannelBuffer STORED = ChannelBuffers.copiedBuffer("STORED\r\n", "US-ASCII"); + private static final ChannelBuffer DELETED = ChannelBuffers.copiedBuffer("DELETED\r\n", "US-ASCII"); + private static final ChannelBuffer END = ChannelBuffers.copiedBuffer("END\r\n", "US-ASCII"); + private static final ChannelBuffer OK = ChannelBuffers.copiedBuffer("OK\r\n", "US-ASCII"); + private static final ChannelBuffer ERROR = ChannelBuffers.copiedBuffer("ERROR\r\n", "US-ASCII"); + private static final ChannelBuffer CLIENT_ERROR = ChannelBuffers.copiedBuffer("CLIENT_ERROR\r\n", "US-ASCII"); + + private final Channel channel; + + private final MemcachedRestRequest request; + + public MemcachedRestChannel(Channel channel, MemcachedRestRequest request) { + this.channel = channel; + this.request = request; + } + + @Override public void sendResponse(RestResponse response) { + if (response.status().getStatus() >= 500) { + channel.write(ERROR.duplicate()); + } else { + if (request.method() == RestRequest.Method.POST) { + // TODO this is SET, can we send a payload? + channel.write(STORED.duplicate()); + } else if (request.method() == RestRequest.Method.DELETE) { + channel.write(DELETED.duplicate()); + } else { // GET + try { + ChannelBuffer writeBuffer = ChannelBuffers.dynamicBuffer(response.contentLength() + 512); + writeBuffer.writeBytes(VALUE.duplicate()); + writeBuffer.writeBytes(Unicode.fromStringAsBytes(request.uri())); + writeBuffer.writeByte((byte) ' '); + writeBuffer.writeByte((byte) '0'); + writeBuffer.writeByte((byte) ' '); + writeBuffer.writeBytes(Bytes.itoa(response.contentLength())); + writeBuffer.writeByte((byte) '\r'); + writeBuffer.writeByte((byte) '\n'); + writeBuffer.writeBytes(response.content(), 0, response.contentLength()); + writeBuffer.writeByte((byte) '\r'); + writeBuffer.writeByte((byte) '\n'); + writeBuffer.writeBytes(END.duplicate()); + channel.write(writeBuffer); + } catch (Exception e) { + throw new MemcachedTransportException("Failed to write 'get' response", e); + } + } + } + } +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java new file mode 100644 index 00000000000..45a14c56692 --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.netty; + +import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.http.BindHttpException; +import org.elasticsearch.memcached.MemcachedServerTransport; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.transport.BindTransportException; +import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.settings.Settings; +import org.elasticsearch.util.transport.BoundTransportAddress; +import org.elasticsearch.util.transport.InetSocketTransportAddress; +import org.elasticsearch.util.transport.PortsRange; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.logging.Slf4JLoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.util.concurrent.DynamicExecutors.*; +import static org.elasticsearch.util.io.HostResolver.*; + +/** + * @author kimchy (shay.banon) + */ +public class NettyMemcachedServerTransport extends AbstractLifecycleComponent implements MemcachedServerTransport { + + static { + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory() { + @Override public InternalLogger newInstance(String name) { + return super.newInstance(name.replace("org.jboss.netty.", "netty.")); + } + }); + } + + private final RestController restController; + + private final int workerCount; + + private final String port; + + private final String bindHost; + + private final String publishHost; + + private final Boolean tcpNoDelay; + + private final Boolean tcpKeepAlive; + + private final Boolean reuseAddress; + + private final SizeValue tcpSendBufferSize; + + private final SizeValue tcpReceiveBufferSize; + + private volatile ServerBootstrap serverBootstrap; + + private volatile BoundTransportAddress boundAddress; + + private volatile Channel serverChannel; + + private volatile OpenChannelsHandler serverOpenChannels; + + @Inject public NettyMemcachedServerTransport(Settings settings, RestController restController) { + super(settings); + this.restController = restController; + + this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors()); + this.port = componentSettings.get("port", "11211-11311"); + this.bindHost = componentSettings.get("bind_host"); + this.publishHost = componentSettings.get("publish_host"); + this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", true); + this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", null); + this.reuseAddress = componentSettings.getAsBoolean("reuse_address", null); + this.tcpSendBufferSize = componentSettings.getAsSize("tcp_send_buffer_size", null); + this.tcpReceiveBufferSize = componentSettings.getAsSize("tcp_receive_buffer_size", null); + } + + @Override public BoundTransportAddress boundAddress() { + return boundAddress; + } + + @Override protected void doStart() throws ElasticSearchException { + this.serverOpenChannels = new OpenChannelsHandler(); + + serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedBoss")), + Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedIoWorker")), + workerCount)); + + ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() { + @Override public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("openChannels", serverOpenChannels); + pipeline.addLast("decoder", new TextMemcachedDecoder()); + pipeline.addLast("dispatcher", new MemcachedDispatcher(restController)); + return pipeline; + } + }; + + serverBootstrap.setPipelineFactory(pipelineFactory); + + if (tcpNoDelay != null) { + serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); + } + if (tcpKeepAlive != null) { + serverBootstrap.setOption("child.keepAlive", tcpKeepAlive); + } + if (tcpSendBufferSize != null) { + serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes()); + } + if (tcpReceiveBufferSize != null) { + serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes()); + } + if (reuseAddress != null) { + serverBootstrap.setOption("reuseAddress", reuseAddress); + serverBootstrap.setOption("child.reuseAddress", reuseAddress); + } + + // Bind and start to accept incoming connections. + InetAddress hostAddressX; + try { + hostAddressX = resolveBindHostAddress(bindHost, settings); + } catch (IOException e) { + throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e); + } + final InetAddress hostAddress = hostAddressX; + + PortsRange portsRange = new PortsRange(port); + final AtomicReference lastException = new AtomicReference(); + boolean success = portsRange.iterate(new PortsRange.PortCallback() { + @Override public boolean onPortNumber(int portNumber) { + try { + serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); + } catch (Exception e) { + lastException.set(e); + return false; + } + return true; + } + }); + if (!success) { + throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get()); + } + + InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress(); + InetSocketAddress publishAddress; + try { + InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings); + if (publishAddressX == null) { + // if its 0.0.0.0, we can't publish that.., default to the local ip address + if (boundAddress.getAddress().isAnyLocalAddress()) { + publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort()); + } else { + publishAddress = boundAddress; + } + } else { + publishAddress = new InetSocketAddress(publishAddressX, boundAddress.getPort()); + } + } catch (Exception e) { + throw new BindTransportException("Failed to resolve publish address", e); + } + this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); + } + + @Override protected void doStop() throws ElasticSearchException { + if (serverChannel != null) { + serverChannel.close().awaitUninterruptibly(); + serverChannel = null; + } + + if (serverOpenChannels != null) { + serverOpenChannels.close(); + serverOpenChannels = null; + } + + if (serverBootstrap != null) { + serverBootstrap.releaseExternalResources(); + serverBootstrap = null; + } + } + + @Override protected void doClose() throws ElasticSearchException { + } +} diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransportModule.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransportModule.java new file mode 100644 index 00000000000..2c210ce8b6f --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransportModule.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.netty; + +import com.google.inject.AbstractModule; +import org.elasticsearch.memcached.MemcachedServerTransport; + +/** + * @author kimchy (shay.banon) + */ +public class NettyMemcachedServerTransportModule extends AbstractModule { + + @Override protected void configure() { + bind(MemcachedServerTransport.class).to(NettyMemcachedServerTransport.class).asEagerSingleton(); + } +} \ No newline at end of file diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java new file mode 100644 index 00000000000..45793d5071a --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.netty; + +import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashSet; +import org.jboss.netty.channel.*; + +/** + * @author kimchy (shay.banon) + */ +@ChannelPipelineCoverage(ChannelPipelineCoverage.ALL) +public class OpenChannelsHandler implements ChannelUpstreamHandler { + + private NonBlockingHashSet openChannels = new NonBlockingHashSet(); + + private final ChannelFutureListener remover = new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) throws Exception { + openChannels.remove(future.getChannel()); + } + }; + + @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { + if (e instanceof ChannelStateEvent) { + ChannelStateEvent evt = (ChannelStateEvent) e; + if (evt.getState() == ChannelState.OPEN) { + boolean added = openChannels.add(ctx.getChannel()); + if (added) { + ctx.getChannel().getCloseFuture().addListener(remover); + } + } + } + ctx.sendUpstream(e); + } + + public void close() { + for (Channel channel : openChannels) { + channel.close().awaitUninterruptibly(); + } + } +} \ No newline at end of file diff --git a/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/TextMemcachedDecoder.java b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/TextMemcachedDecoder.java new file mode 100644 index 00000000000..8993c298a44 --- /dev/null +++ b/plugins/memcached/src/main/java/org/elasticsearch/memcached/netty/TextMemcachedDecoder.java @@ -0,0 +1,151 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.netty; + +import org.elasticsearch.memcached.MemcachedRestRequest; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.util.Unicode; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +import java.io.StreamCorruptedException; +import java.util.regex.Pattern; + +/** + * @author kimchy (shay.banon) + */ +public class TextMemcachedDecoder extends FrameDecoder { + + private final Pattern lineSplit = Pattern.compile(" +"); + + public static final byte CR = 13; + public static final byte LF = 10; + public static final byte[] CRLF = new byte[]{CR, LF}; + + public static enum Reply { + STORED, NOT_STORED, EXISTS, NOT_FOUND, DELETED, STAT, VALUE, END, OK, VERSION, + ERROR, CLIENT_ERROR, SERVER_ERROR; + + private final byte[] bytes; + + Reply() { + this.bytes = this.toString().getBytes(); + } + + public byte[] bytes() { + return bytes; + } + } + + private volatile StringBuffer sb = new StringBuffer(); + + private volatile MemcachedRestRequest request; + + public TextMemcachedDecoder() { + super(false); + } + + @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { + MemcachedRestRequest request = this.request; + if (request == null) { + buffer.markReaderIndex(); + // need to read a header + boolean done = false; + StringBuffer sb = this.sb; + int readableBytes = buffer.readableBytes(); + for (int i = 0; i < readableBytes; i++) { + byte next = buffer.readByte(); + if (next == CR) { + next = buffer.readByte(); + if (next == LF) { + done = true; + break; + } + } else if (next == LF) { + done = true; + break; + } else { + sb.append((char) next); + } + } + if (!done) { + buffer.resetReaderIndex(); + return null; + } + + String[] args = lineSplit.split(sb); + // we read the text, clear it + sb.setLength(0); + + String cmd = args[0]; + String uri = args[1]; + if ("get".equals(cmd)) { + request = new MemcachedRestRequest(RestRequest.Method.GET, uri, -1, false); + if (args.length > 3) { + request.setData(Unicode.fromStringAsBytes(args[2])); + } + return request; + } else if ("delete".equals(cmd)) { + request = new MemcachedRestRequest(RestRequest.Method.DELETE, uri, -1, false); +// if (args.length > 3) { +// request.setData(Unicode.fromStringAsBytes(args[2])); +// } + return request; + } else if ("set".equals(cmd)) { + this.request = new MemcachedRestRequest(RestRequest.Method.POST, uri, Integer.parseInt(args[4]), false); + buffer.markReaderIndex(); + } else if ("quit".equals(cmd)) { + channel.disconnect(); + } + } else { + if (buffer.readableBytes() < (request.getDataSize() + 2)) { + return null; + } + byte[] data = new byte[request.getDataSize()]; + buffer.readBytes(data, 0, data.length); + byte next = buffer.readByte(); + if (next == CR) { + next = buffer.readByte(); + if (next == LF) { + request.setData(data); + // reset + this.request = null; + return request; + } else { + this.request = null; + throw new StreamCorruptedException("Expecting \r\n after data block"); + } + } else { + this.request = null; + throw new StreamCorruptedException("Expecting \r\n after data block"); + } + } + return null; + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { + this.request = null; + sb.setLength(0); + e.getCause().printStackTrace(); + } +} diff --git a/plugins/memcached/src/test/java/org/elasticsearch/memcached/test/SimpleMemcachedActionsTests.java b/plugins/memcached/src/test/java/org/elasticsearch/memcached/test/SimpleMemcachedActionsTests.java new file mode 100644 index 00000000000..6379deef245 --- /dev/null +++ b/plugins/memcached/src/test/java/org/elasticsearch/memcached/test/SimpleMemcachedActionsTests.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.test; + +import net.spy.memcached.MemcachedClient; +import org.elasticsearch.node.Node; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.node.NodeBuilder.*; +import static org.elasticsearch.util.json.JsonBuilder.*; + +/** + * @author kimchy (shay.banon) + */ +@Test +public class SimpleMemcachedActionsTests { + + private Node node; + + private MemcachedClient memcachedClient; + + @BeforeMethod + public void setup() throws IOException { + node = nodeBuilder().node(); + +// NodesInfoResponse nodesInfo = node.client().admin().cluster().nodesInfo(nodesInfo()).actionGet(); + memcachedClient = new MemcachedClient(new InetSocketAddress("localhost", 11211)); + } + + @AfterMethod + public void tearDown() { + memcachedClient.shutdown(); + node.close(); + } + + @Test public void testSimpleOperations() throws Exception { + Future setResult = memcachedClient.set("/test/person/1", 0, binaryJsonBuilder().startObject().field("test", "value").endObject().copiedBytes()); + setResult.get(10, TimeUnit.SECONDS); + + String getResult = (String) memcachedClient.get("/_refresh"); + System.out.println("REFRESH " + getResult); + + getResult = (String) memcachedClient.get("/test/person/1"); + System.out.println("GET " + getResult); + + Future deleteResult = memcachedClient.delete("/test/person/1"); + deleteResult.get(10, TimeUnit.SECONDS); + + getResult = (String) memcachedClient.get("/_refresh"); + System.out.println("REFRESH " + getResult); + + getResult = (String) memcachedClient.get("/test/person/1"); + System.out.println("GET " + getResult); + } +} diff --git a/settings.gradle b/settings.gradle index c7fbb21f713..32376c70e9f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,6 +8,7 @@ include 'benchmark-micro' include 'plugins-attachments' include 'plugins-groovy' +include 'plugins-memcached' rootProject.name = 'elasticsearch-root' rootProject.children.each {project ->