001
014
015 package com.liferay.portal.test;
016
017 import com.liferay.portal.kernel.annotation.AnnotationLocator;
018 import com.liferay.portal.kernel.exception.PortalException;
019 import com.liferay.portal.kernel.exception.SystemException;
020 import com.liferay.portal.kernel.messaging.BaseAsyncDestination;
021 import com.liferay.portal.kernel.messaging.BaseDestination;
022 import com.liferay.portal.kernel.messaging.Destination;
023 import com.liferay.portal.kernel.messaging.DestinationNames;
024 import com.liferay.portal.kernel.messaging.Message;
025 import com.liferay.portal.kernel.messaging.MessageBus;
026 import com.liferay.portal.kernel.messaging.MessageBusUtil;
027 import com.liferay.portal.kernel.messaging.SynchronousDestination;
028 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
029 import com.liferay.portal.kernel.test.AbstractExecutionTestListener;
030 import com.liferay.portal.kernel.test.TestContext;
031 import com.liferay.portal.kernel.transaction.Propagation;
032 import com.liferay.portal.kernel.transaction.TransactionAttribute;
033 import com.liferay.portal.kernel.transaction.TransactionInvokerUtil;
034
035 import java.lang.reflect.Method;
036
037 import java.util.ArrayList;
038 import java.util.List;
039 import java.util.concurrent.Callable;
040
041
044 public class SynchronousDestinationExecutionTestListener
045 extends AbstractExecutionTestListener {
046
047 @Override
048 public void runAfterClass(TestContext testContext) {
049 classSyncHandler.restorePreviousSync();
050 }
051
052 @Override
053 public void runAfterTest(TestContext testContext) {
054 methodSyncHandler.restorePreviousSync();
055 }
056
057 @Override
058 public void runBeforeClass(TestContext testContext) {
059 Class<?> testClass = testContext.getClazz();
060
061 Sync sync = AnnotationLocator.locate(testClass, Sync.class);
062
063 classSyncHandler.setSync(sync);
064 classSyncHandler.setForceSync(ProxyModeThreadLocal.isForceSync());
065
066 classSyncHandler.enableSync();
067 }
068
069 @Override
070 public void runBeforeTest(TestContext testContext) {
071 Method method = testContext.getMethod();
072 Class<?> testClass = testContext.getClazz();
073
074 Sync sync = AnnotationLocator.locate(method, testClass, Sync.class);
075
076 methodSyncHandler.setForceSync(ProxyModeThreadLocal.isForceSync());
077 methodSyncHandler.setSync(sync);
078
079 methodSyncHandler.enableSync();
080 }
081
082 protected SyncHandler classSyncHandler = new SyncHandler();
083 protected SyncHandler methodSyncHandler = new SyncHandler();
084
085 protected class SyncHandler {
086
087 public BaseDestination createSynchronousDestination(
088 String destinationName) {
089
090 SynchronousDestination synchronousDestination;
091
092 if ((_sync != null) && _sync.cleanTransaction()) {
093 synchronousDestination =
094 new CleanTransactionSynchronousDestination();
095 }
096 else {
097 synchronousDestination = new SynchronousDestination();
098 }
099
100 synchronousDestination.setName(destinationName);
101
102 return synchronousDestination;
103 }
104
105 public void enableSync() {
106 if (_sync == null) {
107 return;
108 }
109
110 ProxyModeThreadLocal.setForceSync(true);
111
112 replaceDestination(DestinationNames.ASYNC_SERVICE);
113 replaceDestination(DestinationNames.BACKGROUND_TASK);
114 replaceDestination(
115 DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
116 replaceDestination(
117 DestinationNames.DOCUMENT_LIBRARY_SYNC_EVENT_PROCESSOR);
118 replaceDestination(DestinationNames.MAIL);
119 replaceDestination(DestinationNames.SEARCH_READER);
120 replaceDestination(DestinationNames.SEARCH_WRITER);
121 replaceDestination(DestinationNames.SUBSCRIPTION_SENDER);
122 }
123
124 public void replaceDestination(String destinationName) {
125 MessageBus messageBus = MessageBusUtil.getMessageBus();
126
127 Destination destination = messageBus.getDestination(
128 destinationName);
129
130 if (destination instanceof BaseAsyncDestination) {
131 _asyncServiceDestinations.add(destination);
132
133 messageBus.replace(
134 createSynchronousDestination(destinationName));
135 }
136
137 if (destination == null) {
138 _absentDestinationNames.add(destinationName);
139
140 messageBus.addDestination(
141 createSynchronousDestination(destinationName));
142 }
143 }
144
145 public void restorePreviousSync() {
146 if (_sync == null) {
147 return;
148 }
149
150 ProxyModeThreadLocal.setForceSync(_forceSync);
151
152 MessageBus messageBus = MessageBusUtil.getMessageBus();
153
154 for (Destination destination : _asyncServiceDestinations) {
155 messageBus.replace(destination);
156 }
157
158 _asyncServiceDestinations.clear();
159
160 for (String absentDestinationName : _absentDestinationNames) {
161 messageBus.removeDestination(absentDestinationName);
162 }
163 }
164
165 public void setForceSync(boolean forceSync) {
166 _forceSync = forceSync;
167 }
168
169 public void setSync(Sync sync) {
170 _sync = sync;
171 }
172
173 private final List<String> _absentDestinationNames =
174 new ArrayList<String>();
175 private final List<Destination> _asyncServiceDestinations =
176 new ArrayList<Destination>();
177 private boolean _forceSync;
178 private Sync _sync;
179
180 }
181
182 private static final TransactionAttribute _transactionAttribute;
183
184 static {
185 TransactionAttribute.Builder builder =
186 new TransactionAttribute.Builder();
187
188 builder.setPropagation(Propagation.NOT_SUPPORTED);
189 builder.setRollbackForClasses(
190 PortalException.class, SystemException.class);
191
192 _transactionAttribute = builder.build();
193 }
194
195 private static class CleanTransactionSynchronousDestination
196 extends SynchronousDestination {
197
198 @Override
199 public void send(final Message message) {
200 try {
201 TransactionInvokerUtil.invoke(
202 _transactionAttribute, new Callable<Void>() {
203
204 @Override
205 public Void call() throws Exception {
206 CleanTransactionSynchronousDestination.super.send(
207 message);
208
209 return null;
210 }
211 });
212 }
213 catch (Throwable t) {
214 throw new RuntimeException(t);
215 }
216 }
217 }
218
219 }