001
014
015 package com.liferay.portal.messaging.proxy;
016
017 import com.liferay.portal.kernel.messaging.Message;
018 import com.liferay.portal.kernel.messaging.proxy.BaseProxyBean;
019 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
020 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
021 import com.liferay.portal.kernel.messaging.proxy.ProxyRequest;
022 import com.liferay.portal.kernel.messaging.proxy.ProxyResponse;
023 import com.liferay.portal.kernel.messaging.sender.SingleDestinationMessageSender;
024 import com.liferay.portal.kernel.messaging.sender.SingleDestinationSynchronousMessageSender;
025 import com.liferay.util.aspectj.AspectJUtil;
026
027 import java.util.Map;
028
029 import org.aspectj.lang.ProceedingJoinPoint;
030
031
036 public class MessagingProxyAdvice {
037
038 public Object invoke(ProceedingJoinPoint proceedingJoinPoint)
039 throws Throwable {
040
041 Message message = new Message();
042
043 ProxyRequest proxyRequest = createProxyRequest(proceedingJoinPoint);
044
045 message.setPayload(proxyRequest);
046
047 Map<String, Object> messageValues =
048 MessageValuesThreadLocal.getValues();
049
050 if (!messageValues.isEmpty()) {
051 for (String key : messageValues.keySet()) {
052 message.put(key, messageValues.get(key));
053 }
054 }
055
056 BaseProxyBean baseProxyBean =
057 (BaseProxyBean)proceedingJoinPoint.getTarget();
058
059 if (proxyRequest.isSynchronous() ||
060 ProxyModeThreadLocal.isForceSync()) {
061
062 return doInvokeSynchronous(
063 message, baseProxyBean, proceedingJoinPoint);
064 }
065 else {
066 doInvokeAsynchronous(message, baseProxyBean);
067
068 return null;
069 }
070 }
071
072 protected ProxyRequest createProxyRequest(
073 ProceedingJoinPoint proceedingJoinPoint)
074 throws Exception {
075
076 return new ProxyRequest(
077 AspectJUtil.getMethod(proceedingJoinPoint),
078 proceedingJoinPoint.getArgs());
079 }
080
081 protected void doInvokeAsynchronous(
082 Message message, BaseProxyBean baseProxyBean) {
083
084 SingleDestinationMessageSender messageSender =
085 baseProxyBean.getSingleDestinationMessageSender();
086
087 if (messageSender == null) {
088 throw new IllegalStateException(
089 "Asynchronous message sender was not configured properly for " +
090 baseProxyBean.getClass().getName());
091 }
092
093 messageSender.send(message);
094 }
095
096 protected Object doInvokeSynchronous(
097 Message message, BaseProxyBean baseProxyBean,
098 ProceedingJoinPoint proceedingJoinPoint)
099 throws Throwable {
100
101 SingleDestinationSynchronousMessageSender messageSender =
102 baseProxyBean.getSingleDestinationSynchronousMessageSender();
103
104 if (messageSender == null) {
105 throw new IllegalStateException(
106 "Synchronous message sender was not configured properly for " +
107 baseProxyBean.getClass().getName());
108 }
109
110 ProxyResponse proxyResponse = (ProxyResponse)messageSender.send(
111 message);
112
113 if (proxyResponse == null) {
114 return proceedingJoinPoint.proceed();
115 }
116 else if (proxyResponse.hasError()) {
117 throw proxyResponse.getException();
118 }
119 else {
120 return proxyResponse.getResult();
121 }
122 }
123
124 }