001
014
015 package com.liferay.portal.kernel.test.rule.callback;
016
017 import com.liferay.portal.kernel.exception.PortalException;
018 import com.liferay.portal.kernel.exception.SystemException;
019 import com.liferay.portal.kernel.messaging.BaseAsyncDestination;
020 import com.liferay.portal.kernel.messaging.BaseDestination;
021 import com.liferay.portal.kernel.messaging.Destination;
022 import com.liferay.portal.kernel.messaging.DestinationNames;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.messaging.MessageBus;
025 import com.liferay.portal.kernel.messaging.MessageBusUtil;
026 import com.liferay.portal.kernel.messaging.SynchronousDestination;
027 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
028 import com.liferay.portal.kernel.test.rule.Sync;
029 import com.liferay.portal.kernel.test.rule.callback.SynchronousDestinationTestCallback.SyncHandler;
030 import com.liferay.portal.kernel.transaction.Propagation;
031 import com.liferay.portal.kernel.transaction.TransactionAttribute;
032 import com.liferay.portal.kernel.transaction.TransactionInvokerUtil;
033 import com.liferay.registry.Filter;
034 import com.liferay.registry.Registry;
035 import com.liferay.registry.RegistryUtil;
036 import com.liferay.registry.dependency.ServiceDependencyManager;
037
038 import java.util.ArrayList;
039 import java.util.List;
040 import java.util.concurrent.Callable;
041
042 import org.junit.runner.Description;
043
044
047 public class SynchronousDestinationTestCallback
048 extends BaseTestCallback<SyncHandler, SyncHandler> {
049
050 public static final SynchronousDestinationTestCallback INSTANCE =
051 new SynchronousDestinationTestCallback();
052
053 @Override
054 public void doAfterMethod(
055 Description description, SyncHandler syncHandler, Object target) {
056
057 syncHandler.restorePreviousSync();
058 }
059
060 @Override
061 public SyncHandler doBeforeMethod(Description description, Object target) {
062 Sync sync = description.getAnnotation(Sync.class);
063
064 if (sync == null) {
065 Class<?> testClass = description.getTestClass();
066
067 sync = testClass.getAnnotation(Sync.class);
068 }
069
070 SyncHandler syncHandler = new SyncHandler();
071
072 syncHandler.setForceSync(ProxyModeThreadLocal.isForceSync());
073 syncHandler.setSync(sync);
074
075 syncHandler.enableSync();
076
077 return syncHandler;
078 }
079
080 public static class SyncHandler {
081
082 public BaseDestination createSynchronousDestination(
083 String destinationName) {
084
085 SynchronousDestination synchronousDestination;
086
087 if ((_sync != null) && _sync.cleanTransaction()) {
088 synchronousDestination =
089 new CleanTransactionSynchronousDestination();
090 }
091 else {
092 synchronousDestination = new SynchronousDestination();
093 }
094
095 synchronousDestination.setName(destinationName);
096
097 return synchronousDestination;
098 }
099
100 public void enableSync() {
101 if (_sync == null) {
102 return;
103 }
104
105 ServiceDependencyManager serviceDependencyManager =
106 new ServiceDependencyManager();
107
108 Filter asyncFilter = _registerDestinationFilter(
109 DestinationNames.ASYNC_SERVICE);
110 Filter backgroundTaskFilter = _registerDestinationFilter(
111 DestinationNames.BACKGROUND_TASK);
112 Filter mailFilter = _registerDestinationFilter(
113 DestinationNames.MAIL);
114 Filter pdfProcessorFilter = _registerDestinationFilter(
115 DestinationNames.DOCUMENT_LIBRARY_PDF_PROCESSOR);
116 Filter rawMetaDataProcessorFilter = _registerDestinationFilter(
117 DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
118 Filter subscrpitionSenderFilter = _registerDestinationFilter(
119 DestinationNames.SUBSCRIPTION_SENDER);
120
121 serviceDependencyManager.registerDependencies(
122 asyncFilter, backgroundTaskFilter, mailFilter,
123 pdfProcessorFilter, rawMetaDataProcessorFilter,
124 subscrpitionSenderFilter);
125
126 serviceDependencyManager.waitForDependencies();
127
128 ProxyModeThreadLocal.setForceSync(true);
129
130 replaceDestination(DestinationNames.ASYNC_SERVICE);
131 replaceDestination(DestinationNames.BACKGROUND_TASK);
132 replaceDestination(DestinationNames.DOCUMENT_LIBRARY_PDF_PROCESSOR);
133 replaceDestination(
134 DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
135 replaceDestination(
136 DestinationNames.DOCUMENT_LIBRARY_SYNC_EVENT_PROCESSOR);
137 replaceDestination(DestinationNames.MAIL);
138 replaceDestination(DestinationNames.SEARCH_READER);
139 replaceDestination(DestinationNames.SEARCH_WRITER);
140 replaceDestination(DestinationNames.SUBSCRIPTION_SENDER);
141 }
142
143 public void replaceDestination(String destinationName) {
144 MessageBus messageBus = MessageBusUtil.getMessageBus();
145
146 Destination destination = messageBus.getDestination(
147 destinationName);
148
149 if (destination instanceof BaseAsyncDestination) {
150 _asyncServiceDestinations.add(destination);
151
152 messageBus.replace(
153 createSynchronousDestination(destinationName), false);
154 }
155
156 if (destination == null) {
157 _absentDestinationNames.add(destinationName);
158
159 messageBus.addDestination(
160 createSynchronousDestination(destinationName));
161 }
162 }
163
164 public void restorePreviousSync() {
165 if (_sync == null) {
166 return;
167 }
168
169 ProxyModeThreadLocal.setForceSync(_forceSync);
170
171 MessageBus messageBus = MessageBusUtil.getMessageBus();
172
173 for (Destination destination : _asyncServiceDestinations) {
174 messageBus.replace(destination);
175 }
176
177 _asyncServiceDestinations.clear();
178
179 for (String absentDestinationName : _absentDestinationNames) {
180 messageBus.removeDestination(absentDestinationName);
181 }
182 }
183
184 public void setForceSync(boolean forceSync) {
185 _forceSync = forceSync;
186 }
187
188 public void setSync(Sync sync) {
189 _sync = sync;
190 }
191
192 private Filter _registerDestinationFilter(String destinationName) {
193 Registry registry = RegistryUtil.getRegistry();
194
195 return registry.getFilter(
196 "(&(destination.name=" + destinationName +
197 ")(objectClass=" + Destination.class.getName() + "))");
198 }
199
200 private final List<String> _absentDestinationNames = new ArrayList<>();
201 private final List<Destination> _asyncServiceDestinations =
202 new ArrayList<>();
203 private boolean _forceSync;
204 private Sync _sync;
205
206 }
207
208 protected SynchronousDestinationTestCallback() {
209 }
210
211 private static final TransactionAttribute _transactionAttribute;
212
213 static {
214 TransactionAttribute.Builder builder =
215 new TransactionAttribute.Builder();
216
217 builder.setPropagation(Propagation.NOT_SUPPORTED);
218 builder.setRollbackForClasses(
219 PortalException.class, SystemException.class);
220
221 _transactionAttribute = builder.build();
222 }
223
224 private static class CleanTransactionSynchronousDestination
225 extends SynchronousDestination {
226
227 @Override
228 public void send(final Message message) {
229 try {
230 TransactionInvokerUtil.invoke(
231 _transactionAttribute, new Callable<Void>() {
232
233 @Override
234 public Void call() throws Exception {
235 CleanTransactionSynchronousDestination.super.send(
236 message);
237
238 return null;
239 }
240 });
241 }
242 catch (Throwable t) {
243 throw new RuntimeException(t);
244 }
245 }
246
247 }
248
249 }