diff --git a/apis/sqs/src/main/java/org/jclouds/sqs/SQS.java b/apis/sqs/src/main/java/org/jclouds/sqs/SQS.java new file mode 100644 index 0000000000..80e4936434 --- /dev/null +++ b/apis/sqs/src/main/java/org/jclouds/sqs/SQS.java @@ -0,0 +1,84 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.sqs; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.jclouds.collect.AdvanceUntilEmptyIterable; +import org.jclouds.javax.annotation.Nullable; +import org.jclouds.sqs.domain.Message; +import org.jclouds.sqs.features.MessageApi; +import org.jclouds.sqs.options.ReceiveMessageOptions; + +import com.google.common.annotations.Beta; +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.collect.FluentIterable; + +/** + * Utilities for interacting with SQS + * + * @author Adrian Cole + */ +@Beta +public class SQS { + + /** + * Returns an iterable that lazy fetches messages until there are none left. + * Note that this method will make multiple network calls. + * + * @param api + * api targeted at the queue in question + * @param messagesPerPage + * how many messages to receive per request (current max: 10) + * @param options + * controls attributes and visibility options + * @return an iterable that lazy fetches messages until there are none left + */ + public static FluentIterable receiveAllAtRate(MessageApi api, int messagesPerPage, + ReceiveMessageOptions options) { + return AdvanceUntilEmptyIterable.create(new MoreMessages(api, messagesPerPage, options)).concat(); + } + + /** + * returns another response of messages on {@link MoreMessages#get} + * + */ + private static class MoreMessages implements Supplier> { + + private static final ReceiveMessageOptions NO_OPTIONS = new ReceiveMessageOptions(); + private MessageApi api; + private int max; + private ReceiveMessageOptions options; + + private MoreMessages(MessageApi api, int max, @Nullable ReceiveMessageOptions options) { + this.api = checkNotNull(api, "message api"); + checkState(max > 0, "max messages per request must be a positive number"); + this.max = max; + this.options = Optional.fromNullable(options).or(NO_OPTIONS); + } + + @Override + public FluentIterable get() { + return api.receive(max, options); + } + + } +} diff --git a/apis/sqs/src/main/java/org/jclouds/sqs/features/Messages.java b/apis/sqs/src/main/java/org/jclouds/sqs/features/Messages.java new file mode 100644 index 0000000000..7c9813af26 --- /dev/null +++ b/apis/sqs/src/main/java/org/jclouds/sqs/features/Messages.java @@ -0,0 +1,52 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.sqs.features; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.jclouds.sqs.domain.Message; + +import com.google.common.base.Function; + +/** + * Utilities for {@link Message}s + * + * @author Adrian Cole + */ +public class Messages { + public static Function toReceiptHandle() { + return ToReceiptHandleFunction.INSTANCE; + } + + // enum singleton pattern + private enum ToReceiptHandleFunction implements Function { + INSTANCE; + + @Override + public String apply(Message o) { + return checkNotNull(o, "message").getReceiptHandle(); + } + + @Override + public String toString() { + return "toReceiptHandle"; + } + } + +} diff --git a/apis/sqs/src/test/java/org/jclouds/sqs/SQSTest.java b/apis/sqs/src/test/java/org/jclouds/sqs/SQSTest.java new file mode 100644 index 0000000000..26b31b16c6 --- /dev/null +++ b/apis/sqs/src/test/java/org/jclouds/sqs/SQSTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.sqs; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; + +import org.easymock.EasyMock; +import org.jclouds.sqs.domain.Message; +import org.jclouds.sqs.features.MessageApi; +import org.jclouds.sqs.options.ReceiveMessageOptions; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +/** + * Tests behavior of {@code SQS}. + * + * @author Adrian Cole + */ +@Test(testName = "SQSTest", singleThreaded = true) +public class SQSTest { + + /** + * Tests {@link SQS#receiveAllAtRate} where a single response returns all + * results. + */ + @Test + public void testSinglePageResult() throws Exception { + MessageApi messageClient = createMock(MessageApi.class); + ReceiveMessageOptions options = new ReceiveMessageOptions(); + FluentIterable aMessage = FluentIterable.from(ImmutableSet.of(createMock(Message.class))); + FluentIterable noMessages = FluentIterable.from(ImmutableSet.of()); + + expect(messageClient.receive(1, options)) + .andReturn(aMessage) + .once(); + + expect(messageClient.receive(1, options)) + .andReturn(noMessages) + .once(); + + EasyMock.replay(messageClient); + + Assert.assertEquals(1, Iterables.size(SQS.receiveAllAtRate(messageClient, 1, options))); + } + + /** + * Tests {@link SQS#receiveAllAtRate} where retrieving all results requires multiple requests. + */ + @Test + public void testMultiPageResult() throws Exception { + MessageApi messageClient = createMock(MessageApi.class); + ReceiveMessageOptions options = new ReceiveMessageOptions(); + FluentIterable aMessage = FluentIterable.from(ImmutableSet.of(createMock(Message.class))); + FluentIterable noMessages = FluentIterable.from(ImmutableSet.of()); + + expect(messageClient.receive(1, options)) + .andReturn(aMessage) + .times(2); + expect(messageClient.receive(1, options)) + .andReturn(noMessages) + .once(); + + EasyMock.replay(messageClient); + + Assert.assertEquals(2, Iterables.size(SQS.receiveAllAtRate(messageClient, 1, options))); + } +} diff --git a/apis/sqs/src/test/java/org/jclouds/sqs/features/BulkMessageApiLiveTest.java b/apis/sqs/src/test/java/org/jclouds/sqs/features/BulkMessageApiLiveTest.java index 9e0fe24c8a..3c9ad94259 100644 --- a/apis/sqs/src/test/java/org/jclouds/sqs/features/BulkMessageApiLiveTest.java +++ b/apis/sqs/src/test/java/org/jclouds/sqs/features/BulkMessageApiLiveTest.java @@ -26,13 +26,13 @@ import java.net.URI; import java.util.Map.Entry; import java.util.Set; +import org.jclouds.sqs.SQS; import org.jclouds.sqs.domain.BatchResult; import org.jclouds.sqs.domain.Message; import org.jclouds.sqs.domain.MessageIdAndMD5; import org.jclouds.sqs.internal.BaseSQSApiLiveTest; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import org.testng.internal.annotations.Sets; import com.google.common.base.Charsets; import com.google.common.base.Function; @@ -111,11 +111,7 @@ public class BulkMessageApiLiveTest extends BaseSQSApiLiveTest { } protected Set collectMessages(MessageApi api) { - // you are not guaranteed to get all messages in the same request - Set messages = Sets.newLinkedHashSet(); - while (messages.size() != idPayload.size()) - messages.addAll(api.receive(idPayload.size(), attribute("None").visibilityTimeout(5)).toImmutableSet()); - return messages; + return SQS.receiveAllAtRate(api, idPayload.size(), attribute("None").visibilityTimeout(5)).toImmutableSet(); } @Test(dependsOnMethods = "testChangeMessageVisibility") diff --git a/apis/sqs/src/test/java/org/jclouds/sqs/features/QueueApiLiveTest.java b/apis/sqs/src/test/java/org/jclouds/sqs/features/QueueApiLiveTest.java index 1838163574..0d98c06c2a 100644 --- a/apis/sqs/src/test/java/org/jclouds/sqs/features/QueueApiLiveTest.java +++ b/apis/sqs/src/test/java/org/jclouds/sqs/features/QueueApiLiveTest.java @@ -69,10 +69,17 @@ public class QueueApiLiveTest extends BaseSQSApiLiveTest { recreateQueueInRegion(prefix, null); } + @Test(dependsOnMethods = "testCanRecreateQueueGracefully") + public void testCreateQueueWhenAlreadyExistsReturnsURI() { + for (URI queue : queues) { + assertEquals(api().getQueueApi().create(prefix), queue); + } + } + @Test(dependsOnMethods = "testCanRecreateQueueGracefully") public void testGet() { for (URI queue : queues) { - assertEquals(queue, api().getQueueApi().get(prefix)); + assertEquals(api().getQueueApi().get(prefix), queue); } } diff --git a/apis/sqs/src/test/java/org/jclouds/sqs/internal/BaseSQSApiLiveTest.java b/apis/sqs/src/test/java/org/jclouds/sqs/internal/BaseSQSApiLiveTest.java index e2384e2197..1ebe3b622a 100644 --- a/apis/sqs/src/test/java/org/jclouds/sqs/internal/BaseSQSApiLiveTest.java +++ b/apis/sqs/src/test/java/org/jclouds/sqs/internal/BaseSQSApiLiveTest.java @@ -19,8 +19,6 @@ package org.jclouds.sqs.internal; import static com.google.common.collect.Iterables.get; -import static com.google.common.collect.Iterables.getLast; -import static org.jclouds.sqs.options.ListQueuesOptions.Builder.queuePrefix; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -67,9 +65,9 @@ public class BaseSQSApiLiveTest extends BaseContextLiveTest result = api.list(queuePrefix(queueName)); - if (result.size() >= 1) { - api.delete(getLast(result)); + URI result = api.get(queueName); + if (result != null) { + api.delete(result); } URI queue = api.create(queueName); assertQueueInList(region, queue);