Watcher: Clean up email server (elastic/elasticsearch#4163)

* Do not try bind to port range but use free ephemeral port
* Start a new email server in all tests, do not use static one
* Remove selection of username/password, as it was static anyway
* Remove Listener.Handle class, as it is not needed, when not running in static context

Original commit: elastic/x-pack-elasticsearch@8816cc25f6
This commit is contained in:
Alexander Reelsen 2016-11-23 18:19:02 +01:00 committed by GitHub
parent ddddee1e1f
commit d53dbe5283
5 changed files with 75 additions and 168 deletions

View File

@ -17,7 +17,6 @@ import org.junit.Before;
import javax.mail.Address;
import javax.mail.Message;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -30,14 +29,12 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class AccountTests extends ESTestCase {
static final String USERNAME = "_user";
static final String PASSWORD = "_passwd";
private EmailServer server;
@Before
public void init() throws Exception {
server = EmailServer.localhost("2500-2600", USERNAME, PASSWORD, logger);
server = EmailServer.localhost(logger);
}
@After
@ -163,8 +160,8 @@ public class AccountTests extends ESTestCase {
Account account = new Account(new Account.Config("default", Settings.builder()
.put("smtp.host", "localhost")
.put("smtp.port", server.port())
.put("smtp.user", USERNAME)
.put("smtp.password", PASSWORD)
.put("smtp.user", EmailServer.USERNAME)
.put("smtp.password", EmailServer.PASSWORD)
.build()), null, logger);
Email email = Email.builder()
@ -176,7 +173,7 @@ public class AccountTests extends ESTestCase {
.build();
final CountDownLatch latch = new CountDownLatch(1);
EmailServer.Listener.Handle handle = server.addListener(message -> {
server.addListener(message -> {
assertThat(message.getFrom().length, is(1));
assertThat(message.getFrom()[0], equalTo(new InternetAddress("from@domain.com")));
assertThat(message.getRecipients(Message.RecipientType.TO).length, is(1));
@ -192,16 +189,14 @@ public class AccountTests extends ESTestCase {
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting for email too long");
}
handle.remove();
}
public void testSendCCAndBCC() throws Exception {
Account account = new Account(new Account.Config("default", Settings.builder()
.put("smtp.host", "localhost")
.put("smtp.port", server.port())
.put("smtp.user", USERNAME)
.put("smtp.password", PASSWORD)
.put("smtp.user", EmailServer.USERNAME)
.put("smtp.password", EmailServer.PASSWORD)
.build()), null, logger);
Email email = Email.builder()
@ -214,7 +209,7 @@ public class AccountTests extends ESTestCase {
.build();
final CountDownLatch latch = new CountDownLatch(5);
EmailServer.Listener.Handle handle = server.addListener(message -> {
server.addListener(message -> {
assertThat(message.getFrom().length, is(1));
assertThat(message.getFrom()[0], equalTo(new InternetAddress("from@domain.com")));
assertThat(message.getRecipients(Message.RecipientType.TO).length, is(1));
@ -234,8 +229,6 @@ public class AccountTests extends ESTestCase {
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting for email too long");
}
handle.remove();
}
public void testSendAuthentication() throws Exception {
@ -253,20 +246,13 @@ public class AccountTests extends ESTestCase {
.build();
final CountDownLatch latch = new CountDownLatch(1);
EmailServer.Listener.Handle handle = server.addListener(new EmailServer.Listener() {
@Override
public void on(MimeMessage message) throws Exception {
latch.countDown();
}
});
server.addListener(message -> latch.countDown());
account.send(email, new Authentication(USERNAME, new Secret(PASSWORD.toCharArray())), Profile.STANDARD);
account.send(email, new Authentication(EmailServer.USERNAME, new Secret(EmailServer.PASSWORD.toCharArray())), Profile.STANDARD);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting for email too long");
}
handle.remove();
}
public void testDefaultAccountTimeout() {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.notification.email;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.notification.email.support.EmailServer;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
@ -19,12 +20,10 @@ import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.elasticsearch.xpack.notification.email.support.EmailServer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import javax.mail.internet.MimeMessage;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -40,12 +39,16 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTestCase {
static final String USERNAME = "_user";
static final String PASSWORD = "_passwd";
private EmailServer server;
private Boolean encryptSensitiveData;
@Override
public void setUp() throws Exception {
super.setUp();
server = EmailServer.localhost(logger);
}
@After
public void cleanup() throws Exception {
server.stop();
@ -53,10 +56,6 @@ public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTest
@Override
protected Settings nodeSettings(int nodeOrdinal) {
if(server == null) {
//Need to construct the Email Server here as this happens before init()
server = EmailServer.localhost("2500-2600", USERNAME, PASSWORD, logger);
}
if (encryptSensitiveData == null) {
encryptSensitiveData = securityEnabled() && randomBoolean();
}
@ -81,7 +80,7 @@ public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTest
.from("_from")
.to("_to")
.subject("_subject"))
.setAuthentication(USERNAME, PASSWORD.toCharArray())))
.setAuthentication(EmailServer.USERNAME, EmailServer.PASSWORD.toCharArray())))
.get();
// verifying the email password is stored encrypted in the index
@ -92,11 +91,11 @@ public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTest
Object value = XContentMapValues.extractValue("actions._email.email.password", source);
assertThat(value, notNullValue());
if (securityEnabled() && encryptSensitiveData) {
assertThat(value, not(is((Object) PASSWORD)));
assertThat(value, not(is(EmailServer.PASSWORD)));
CryptoService cryptoService = getInstanceFromMaster(CryptoService.class);
assertThat(new String(cryptoService.decrypt(((String) value).toCharArray())), is(PASSWORD));
assertThat(new String(cryptoService.decrypt(((String) value).toCharArray())), is(EmailServer.PASSWORD));
} else {
assertThat(value, is((Object) PASSWORD));
assertThat(value, is(EmailServer.PASSWORD));
}
// verifying the password is not returned by the GET watch API
@ -114,12 +113,9 @@ public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTest
// now lets execute the watch manually
final CountDownLatch latch = new CountDownLatch(1);
server.addListener(new EmailServer.Listener() {
@Override
public void on(MimeMessage message) throws Exception {
assertThat(message.getSubject(), is("_subject"));
latch.countDown();
}
server.addListener(message -> {
assertThat(message.getSubject(), is("_subject"));
latch.countDown();
});
TriggerEvent triggerEvent = new ScheduleTriggerEvent(new DateTime(DateTimeZone.UTC), new DateTime(DateTimeZone.UTC));
@ -132,7 +128,7 @@ public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTest
contentSource = executeResponse.getRecordSource();
value = contentSource.getValue("result.actions.0.status");
assertThat((String) value, is("success"));
assertThat(value, is("success"));
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for the email to be sent");

View File

@ -6,14 +6,7 @@
package org.elasticsearch.xpack.notification.email.support;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.transport.PortsRange;
import org.subethamail.smtp.TooMuchDataException;
import org.subethamail.smtp.auth.EasyAuthenticationHandlerFactory;
import org.subethamail.smtp.auth.LoginFailedException;
import org.subethamail.smtp.auth.UsernamePasswordValidator;
import org.subethamail.smtp.helper.SimpleMessageListener;
import org.subethamail.smtp.helper.SimpleMessageListenerAdapter;
import org.subethamail.smtp.server.SMTPServer;
@ -23,11 +16,9 @@ import javax.mail.Session;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.io.InputStream;
import java.net.BindException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@ -35,16 +26,16 @@ import static org.junit.Assert.fail;
/**
* An mini email smtp server that can be used for unit testing
*
*
*/
public class EmailServer {
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
public static final String USERNAME = "_user";
public static final String PASSWORD = "_passwd";
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final SMTPServer server;
public EmailServer(String host, int port, final String username, final String password, final Logger logger) {
public EmailServer(String host, final Logger logger) {
server = new SMTPServer(new SimpleMessageListenerAdapter(new SimpleMessageListener() {
@Override
public boolean accept(String from, String recipient) {
@ -52,7 +43,7 @@ public class EmailServer {
}
@Override
public void deliver(String from, String recipient, InputStream data) throws TooMuchDataException, IOException {
public void deliver(String from, String recipient, InputStream data) throws IOException {
try {
Session session = Session.getInstance(new Properties());
MimeMessage msg = new MimeMessage(session, data);
@ -68,15 +59,12 @@ public class EmailServer {
throw new RuntimeException("could not create mime message", me);
}
}
}), new EasyAuthenticationHandlerFactory(new UsernamePasswordValidator() {
@Override
public void login(String user, String passwd) throws LoginFailedException {
assertThat(user, is(username));
assertThat(passwd, is(password));
}
}), new EasyAuthenticationHandlerFactory((user, passwd) -> {
assertThat(user, is(USERNAME));
assertThat(passwd, is(PASSWORD));
}));
server.setHostName(host);
server.setPort(port);
server.setPort(0);
}
/**
@ -95,58 +83,18 @@ public class EmailServer {
listeners.clear();
}
public Listener.Handle addListener(Listener listener) {
public void addListener(Listener listener) {
listeners.add(listener);
return new Listener.Handle(listeners, listener);
}
public static EmailServer localhost(String portRangeStr, final String username, final String password, final Logger logger) {
final AtomicReference<EmailServer> emailServer = new AtomicReference<>();
boolean bound = new PortsRange(portRangeStr).iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int port) {
try {
EmailServer server = new EmailServer("localhost", port, username, password, logger);
server.start();
emailServer.set(server);
return true;
} catch (RuntimeException re) {
if (re.getCause() instanceof BindException) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage("port [{}] was already in use trying next port", port), re);
return false;
} else {
throw re;
}
}
}
});
if (!bound || emailServer.get() == null) {
throw new ElasticsearchException("could not bind to any of the port in [" + portRangeStr + "]");
}
return emailServer.get();
public static EmailServer localhost(final Logger logger) {
EmailServer server = new EmailServer("localhost", logger);
server.start();
return server;
}
@FunctionalInterface
public interface Listener {
void on(MimeMessage message) throws Exception;
class Handle {
private final List<Listener> listeners;
private final Listener listener;
Handle(List<Listener> listeners, Listener listener) {
this.listeners = listeners;
this.listener = listener;
}
public void remove() {
listeners.remove(listener);
}
}
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateReque
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.junit.After;
import org.junit.Before;
import javax.mail.BodyPart;
import javax.mail.Multipart;
@ -61,21 +60,22 @@ import static org.hamcrest.Matchers.startsWith;
public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
static final String USERNAME = "_user";
static final String PASSWORD = "_passwd";
private MockWebServer webServer = new MockWebServer();;
private MockWebServer webServer = new MockWebServer();
private QueueDispatcher dispatcher = new QueueDispatcher();
private MockResponse mockResponse = new MockResponse().setResponseCode(200)
.addHeader("Content-Type", "application/foo").setBody("This is the content");
private EmailServer server;
@Before
public void startWebservice() throws Exception {
QueueDispatcher dispatcher = new QueueDispatcher();
@Override
public void setUp() throws Exception {
super.setUp();
dispatcher.setFailFast(true);
webServer.setDispatcher(dispatcher);
webServer.start();
MockResponse mockResponse = new MockResponse().setResponseCode(200)
.addHeader("Content-Type", "application/foo").setBody("This is the content");
webServer.enqueue(mockResponse);
webServer.start();
server = EmailServer.localhost(logger);
}
@After
@ -86,15 +86,11 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
if(server == null) {
//Need to construct the Email Server here as this happens before init()
server = EmailServer.localhost("2500-2600", USERNAME, PASSWORD, logger);
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("xpack.notification.email.account.test.smtp.auth", true)
.put("xpack.notification.email.account.test.smtp.user", USERNAME)
.put("xpack.notification.email.account.test.smtp.password", PASSWORD)
.put("xpack.notification.email.account.test.smtp.user", EmailServer.USERNAME)
.put("xpack.notification.email.account.test.smtp.password", EmailServer.PASSWORD)
.put("xpack.notification.email.account.test.smtp.port", server.port())
.put("xpack.notification.email.account.test.smtp.host", "localhost")
.build();
@ -143,19 +139,16 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
public void testThatEmailAttachmentsAreSent() throws Exception {
DataAttachment dataFormat = randomFrom(JSON, YAML);
final CountDownLatch latch = new CountDownLatch(1);
server.addListener(new EmailServer.Listener() {
@Override
public void on(MimeMessage message) throws Exception {
assertThat(message.getSubject(), equalTo("Subject"));
List<String> attachments = getAttachments(message);
if (dataFormat == YAML) {
assertThat(attachments, hasItem(allOf(startsWith("---"), containsString("_test_id"))));
} else {
assertThat(attachments, hasItem(allOf(startsWith("{"), containsString("_test_id"))));
}
assertThat(attachments, hasItem(containsString("This is the content")));
latch.countDown();
server.addListener(message -> {
assertThat(message.getSubject(), equalTo("Subject"));
List<String> attachments = getAttachments(message);
if (dataFormat == YAML) {
assertThat(attachments, hasItem(allOf(startsWith("---"), containsString("_test_id"))));
} else {
assertThat(attachments, hasItem(allOf(startsWith("{"), containsString("_test_id"))));
}
assertThat(attachments, hasItem(containsString("This is the content")));
latch.countDown();
});
WatcherClient watcherClient = watcherClient();
@ -189,7 +182,7 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(request))
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.addAction("_email", emailAction(emailBuilder).setAuthentication(USERNAME, PASSWORD.toCharArray())
.addAction("_email", emailAction(emailBuilder).setAuthentication(EmailServer.USERNAME, EmailServer.PASSWORD.toCharArray())
.setAttachments(emailAttachments));
logger.info("TMP WATCHSOURCE {}", watchSourceBuilder.build().getBytes().utf8ToString());

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.watcher.history;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@ -16,8 +15,7 @@ import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.After;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -35,35 +33,22 @@ import static org.hamcrest.Matchers.notNullValue;
*/
public class HistoryTemplateEmailMappingsTests extends AbstractWatcherIntegrationTestCase {
private static final String USERNAME = "_user";
private static final String PASSWORD = "_passwd";
private EmailServer server;
private static EmailServer server;
@Override
public void setUp() throws Exception {
super.setUp();
server = EmailServer.localhost(logger);
}
@AfterClass
public static void cleanup() throws Exception {
if (server != null) {
server.stop();
}
server = null;
@After
public void cleanup() throws Exception {
server.stop();
}
@Override
protected boolean timeWarped() {
return true; // just to have better control over the triggers
}
@Override
protected boolean enableSecurity() {
return false; // remove security noise from this test
}
@BeforeClass
public static void setupEmailServer() {
if (server == null) {
//Need to construct the Email Server here as this happens before init()
server = EmailServer.localhost("2500-2600", USERNAME, PASSWORD, Loggers.getLogger(HistoryTemplateTimeMappingsTests.class));
}
return true;
}
@Override
@ -73,8 +58,8 @@ public class HistoryTemplateEmailMappingsTests extends AbstractWatcherIntegratio
// email
.put("xpack.notification.email.account.test.smtp.auth", true)
.put("xpack.notification.email.account.test.smtp.user", USERNAME)
.put("xpack.notification.email.account.test.smtp.password", PASSWORD)
.put("xpack.notification.email.account.test.smtp.user", EmailServer.USERNAME)
.put("xpack.notification.email.account.test.smtp.password", EmailServer.PASSWORD)
.put("xpack.notification.email.account.test.smtp.port", server.port())
.put("xpack.notification.email.account.test.smtp.host", "localhost")
@ -155,5 +140,4 @@ public class HistoryTemplateEmailMappingsTests extends AbstractWatcherIntegratio
assertThat(terms.getBucketByKey("rt2@example.com"), notNullValue());
assertThat(terms.getBucketByKey("rt2@example.com").getDocCount(), is(1L));
}
}