BAEL-3768 - spatialguru.net@gmail.com (#9385)
* Add implementation of BeanPostProcessor and BeanFactoryPostProcessor for ticket * Continue processing beans if annotation not found on one or more * Add integration test * Simplify code
This commit is contained in:
parent
6cd7bfba80
commit
135c3160ac
@ -24,6 +24,16 @@
|
||||
<artifactId>spring-core</artifactId>
|
||||
<version>${spring.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-expression</artifactId>
|
||||
<version>${spring.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>28.2-jre</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-test</artifactId>
|
||||
@ -42,6 +52,18 @@
|
||||
<version>${junit-jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>4.0.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>2.9.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,39 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import com.google.common.eventbus.AsyncEventBus;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@SuppressWarnings("ALL")
|
||||
public final class GlobalEventBus {
|
||||
|
||||
public static final String GLOBAL_EVENT_BUS_EXPRESSION = "T(com.baeldung.postprocessor.GlobalEventBus).getEventBus()";
|
||||
|
||||
private static final String IDENTIFIER = "global-event-bus";
|
||||
|
||||
private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus();
|
||||
|
||||
private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool());
|
||||
|
||||
private GlobalEventBus() {
|
||||
}
|
||||
|
||||
public static GlobalEventBus getInstance() {
|
||||
return GlobalEventBus.GLOBAL_EVENT_BUS;
|
||||
}
|
||||
|
||||
public static EventBus getEventBus() {
|
||||
return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus;
|
||||
}
|
||||
|
||||
public static void subscribe(Object obj) {
|
||||
getEventBus().register(obj);
|
||||
}
|
||||
public static void unsubscribe(Object obj) {
|
||||
getEventBus().unregister(obj);
|
||||
}
|
||||
public static void post(Object event) {
|
||||
getEventBus().post(event);
|
||||
}
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.FatalBeanException;
|
||||
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.ExpressionException;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
|
||||
@SuppressWarnings("ALL")
|
||||
public class GuavaEventBusBeanFactoryPostProcessor implements BeanFactoryPostProcessor {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
private final SpelExpressionParser expressionParser = new SpelExpressionParser();
|
||||
|
||||
@Override
|
||||
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
|
||||
for (Iterator<String> names = beanFactory.getBeanNamesIterator(); names.hasNext(); ) {
|
||||
Object proxy = this.getTargetObject(beanFactory.getBean(names.next()));
|
||||
final Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
|
||||
if (annotation == null)
|
||||
continue;
|
||||
this.logger.info("{}: processing bean of type {} during initialization", this.getClass().getSimpleName(),
|
||||
proxy.getClass().getName());
|
||||
final String annotationValue = annotation.value();
|
||||
try {
|
||||
final Expression expression = this.expressionParser.parseExpression(annotationValue);
|
||||
final Object value = expression.getValue();
|
||||
if (!(value instanceof EventBus)) {
|
||||
this.logger.error("{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
|
||||
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
|
||||
return;
|
||||
}
|
||||
final EventBus eventBus = (EventBus)value;
|
||||
eventBus.register(proxy);
|
||||
} catch (ExpressionException ex) {
|
||||
this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}", this.getClass().getSimpleName(),
|
||||
annotationValue, proxy.getClass().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Object getTargetObject(Object proxy) throws BeansException {
|
||||
if (AopUtils.isJdkDynamicProxy(proxy)) {
|
||||
try {
|
||||
return ((Advised)proxy).getTargetSource().getTarget();
|
||||
} catch (Exception e) {
|
||||
throw new FatalBeanException("Error getting target of JDK proxy", e);
|
||||
}
|
||||
}
|
||||
return proxy;
|
||||
}
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.FatalBeanException;
|
||||
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.ExpressionException;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
|
||||
/**
|
||||
* A {@link DestructionAwareBeanPostProcessor} which registers/un-registers subscribers to a Guava {@link EventBus}. The class must
|
||||
* be annotated with {@link Subscriber} and each subscribing method must be annotated with
|
||||
* {@link com.google.common.eventbus.Subscribe}.
|
||||
*/
|
||||
@SuppressWarnings("ALL")
|
||||
public class GuavaEventBusBeanPostProcessor implements DestructionAwareBeanPostProcessor {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
private final SpelExpressionParser expressionParser = new SpelExpressionParser();
|
||||
|
||||
@Override
|
||||
public void postProcessBeforeDestruction(final Object bean, final String beanName) throws BeansException {
|
||||
this.process(bean, EventBus::unregister, "destruction");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requiresDestruction(Object bean) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
|
||||
this.process(bean, EventBus::register, "initialization");
|
||||
return bean;
|
||||
}
|
||||
|
||||
private void process(final Object bean, final BiConsumer<EventBus, Object> consumer, final String action) {
|
||||
Object proxy = this.getTargetObject(bean);
|
||||
final Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
|
||||
if (annotation == null)
|
||||
return;
|
||||
this.logger.info("{}: processing bean of type {} during {}", this.getClass().getSimpleName(), proxy.getClass().getName(),
|
||||
action);
|
||||
final String annotationValue = annotation.value();
|
||||
try {
|
||||
final Expression expression = this.expressionParser.parseExpression(annotationValue);
|
||||
final Object value = expression.getValue();
|
||||
if (!(value instanceof EventBus)) {
|
||||
this.logger.error("{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
|
||||
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
|
||||
return;
|
||||
}
|
||||
final EventBus eventBus = (EventBus)value;
|
||||
consumer.accept(eventBus, proxy);
|
||||
} catch (ExpressionException ex) {
|
||||
this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}", this.getClass().getSimpleName(),
|
||||
annotationValue, proxy.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
private Object getTargetObject(Object proxy) throws BeansException {
|
||||
if (AopUtils.isJdkDynamicProxy(proxy)) {
|
||||
try {
|
||||
return ((Advised)proxy).getTargetSource().getTarget();
|
||||
} catch (Exception e) {
|
||||
throw new FatalBeanException("Error getting target of JDK proxy", e);
|
||||
}
|
||||
}
|
||||
return proxy;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,34 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
public class StockTrade {
|
||||
|
||||
private final String symbol;
|
||||
private final int quantity;
|
||||
private final double price;
|
||||
private final Date tradeDate;
|
||||
|
||||
public StockTrade(String symbol, int quantity, double price, Date tradeDate) {
|
||||
this.symbol = symbol;
|
||||
this.quantity = quantity;
|
||||
this.price = price;
|
||||
this.tradeDate = tradeDate;
|
||||
}
|
||||
|
||||
public String getSymbol() {
|
||||
return this.symbol;
|
||||
}
|
||||
|
||||
public int getQuantity() {
|
||||
return this.quantity;
|
||||
}
|
||||
|
||||
public double getPrice() {
|
||||
return this.price;
|
||||
}
|
||||
|
||||
public Date getTradeDate() {
|
||||
return this.tradeDate;
|
||||
}
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface StockTradeListener {
|
||||
|
||||
void stockTradePublished(StockTrade trade);
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import com.google.common.eventbus.AllowConcurrentEvents;
|
||||
import com.google.common.eventbus.Subscribe;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
@Subscriber
|
||||
public class StockTradePublisher {
|
||||
|
||||
private final Set<StockTradeListener> stockTradeListeners = new HashSet<>();
|
||||
|
||||
public void addStockTradeListener(StockTradeListener listener) {
|
||||
synchronized (this.stockTradeListeners) {
|
||||
this.stockTradeListeners.add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeStockTradeListener(StockTradeListener listener) {
|
||||
synchronized (this.stockTradeListeners) {
|
||||
this.stockTradeListeners.remove(listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Subscribe
|
||||
@AllowConcurrentEvents
|
||||
private void handleNewStockTradeEvent(StockTrade trade) {
|
||||
// publish to DB, send to PubNub, whatever you want here
|
||||
final Set<StockTradeListener> listeners;
|
||||
synchronized (this.stockTradeListeners) {
|
||||
listeners = new HashSet<>(this.stockTradeListeners);
|
||||
}
|
||||
listeners.forEach(li -> li.stockTradePublished(trade));
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Inherited;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* An annotation which indicates which Guava {@link com.google.common.eventbus.EventBus} a Spring bean wishes to subscribe to.
|
||||
*/
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.TYPE)
|
||||
@Inherited
|
||||
public @interface Subscriber {
|
||||
|
||||
/**
|
||||
* A SpEL expression which selects the {@link com.google.common.eventbus.EventBus}.
|
||||
*/
|
||||
String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION;
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class PostProcessorConfiguration {
|
||||
|
||||
@Bean
|
||||
public GlobalEventBus eventBus() {
|
||||
return GlobalEventBus.getInstance();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() {
|
||||
return new GuavaEventBusBeanPostProcessor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public StockTradePublisher stockTradePublisher() {
|
||||
return new StockTradePublisher();
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
package com.baeldung.postprocessor;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes = {PostProcessorConfiguration.class})
|
||||
public class StockTradeIntegrationTest {
|
||||
|
||||
@Autowired
|
||||
private StockTradePublisher stockTradePublisher;
|
||||
|
||||
@Test
|
||||
public void givenValidConfig_whenTradePublished_thenTradeReceived() {
|
||||
Date tradeDate = new Date();
|
||||
StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate);
|
||||
AtomicBoolean assertionsPassed = new AtomicBoolean(false);
|
||||
StockTradeListener listener = trade -> assertionsPassed.set(this.verifyExact(stockTrade, trade));
|
||||
this.stockTradePublisher.addStockTradeListener(listener);
|
||||
try {
|
||||
GlobalEventBus.post(stockTrade);
|
||||
await().atMost(Duration.ofSeconds(2L))
|
||||
.untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue());
|
||||
} finally {
|
||||
this.stockTradePublisher.removeStockTradeListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean verifyExact(StockTrade stockTrade, StockTrade trade) {
|
||||
return Objects.equals(stockTrade.getSymbol(), trade.getSymbol())
|
||||
&& Objects.equals(stockTrade.getTradeDate(), trade.getTradeDate())
|
||||
&& stockTrade.getQuantity() == trade.getQuantity()
|
||||
&& stockTrade.getPrice() == trade.getPrice();
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user