001
014
015 package com.liferay.portal.test;
016
017 import com.liferay.portal.kernel.exception.PortalException;
018 import com.liferay.portal.kernel.exception.SystemException;
019 import com.liferay.portal.kernel.messaging.BaseAsyncDestination;
020 import com.liferay.portal.kernel.messaging.BaseDestination;
021 import com.liferay.portal.kernel.messaging.Destination;
022 import com.liferay.portal.kernel.messaging.DestinationNames;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.messaging.MessageBus;
025 import com.liferay.portal.kernel.messaging.MessageBusUtil;
026 import com.liferay.portal.kernel.messaging.SynchronousDestination;
027 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
028 import com.liferay.portal.kernel.test.BaseTestRule;
029 import com.liferay.portal.kernel.transaction.Propagation;
030 import com.liferay.portal.kernel.transaction.TransactionAttribute;
031 import com.liferay.portal.kernel.transaction.TransactionInvokerUtil;
032 import com.liferay.portal.test.SynchronousDestinationTestRule.SyncHandler;
033
034 import java.util.ArrayList;
035 import java.util.List;
036 import java.util.concurrent.Callable;
037
038 import org.junit.runner.Description;
039
040
044 public class SynchronousDestinationTestRule
045 extends BaseTestRule<SyncHandler, SyncHandler> {
046
047 public static final SynchronousDestinationTestRule INSTANCE =
048 new SynchronousDestinationTestRule();
049
050 protected SynchronousDestinationTestRule() {
051 }
052
053 @Override
054 protected void afterMethod(
055 Description description, SyncHandler syncHandler) {
056
057 syncHandler.restorePreviousSync();
058 }
059
060 @Override
061 protected SyncHandler beforeMethod(Description description) {
062 Sync sync = description.getAnnotation(Sync.class);
063
064 if (sync == null) {
065 Class<?> testClass = description.getTestClass();
066
067 sync = testClass.getAnnotation(Sync.class);
068 }
069
070 SyncHandler syncHandler = new SyncHandler();
071
072 syncHandler.setForceSync(ProxyModeThreadLocal.isForceSync());
073 syncHandler.setSync(sync);
074
075 syncHandler.enableSync();
076
077 return syncHandler;
078 }
079
080 protected static class SyncHandler {
081
082 public BaseDestination createSynchronousDestination(
083 String destinationName) {
084
085 SynchronousDestination synchronousDestination;
086
087 if ((_sync != null) && _sync.cleanTransaction()) {
088 synchronousDestination =
089 new CleanTransactionSynchronousDestination();
090 }
091 else {
092 synchronousDestination = new SynchronousDestination();
093 }
094
095 synchronousDestination.setName(destinationName);
096
097 return synchronousDestination;
098 }
099
100 public void enableSync() {
101 if (_sync == null) {
102 return;
103 }
104
105 ProxyModeThreadLocal.setForceSync(true);
106
107 replaceDestination(DestinationNames.ASYNC_SERVICE);
108 replaceDestination(DestinationNames.BACKGROUND_TASK);
109 replaceDestination(
110 DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
111 replaceDestination(
112 DestinationNames.DOCUMENT_LIBRARY_SYNC_EVENT_PROCESSOR);
113 replaceDestination(DestinationNames.MAIL);
114 replaceDestination(DestinationNames.SEARCH_READER);
115 replaceDestination(DestinationNames.SEARCH_WRITER);
116 replaceDestination(DestinationNames.SUBSCRIPTION_SENDER);
117 }
118
119 public void replaceDestination(String destinationName) {
120 MessageBus messageBus = MessageBusUtil.getMessageBus();
121
122 Destination destination = messageBus.getDestination(
123 destinationName);
124
125 if (destination instanceof BaseAsyncDestination) {
126 _asyncServiceDestinations.add(destination);
127
128 messageBus.replace(
129 createSynchronousDestination(destinationName));
130 }
131
132 if (destination == null) {
133 _absentDestinationNames.add(destinationName);
134
135 messageBus.addDestination(
136 createSynchronousDestination(destinationName));
137 }
138 }
139
140 public void restorePreviousSync() {
141 if (_sync == null) {
142 return;
143 }
144
145 ProxyModeThreadLocal.setForceSync(_forceSync);
146
147 MessageBus messageBus = MessageBusUtil.getMessageBus();
148
149 for (Destination destination : _asyncServiceDestinations) {
150 messageBus.replace(destination);
151 }
152
153 _asyncServiceDestinations.clear();
154
155 for (String absentDestinationName : _absentDestinationNames) {
156 messageBus.removeDestination(absentDestinationName);
157 }
158 }
159
160 public void setForceSync(boolean forceSync) {
161 _forceSync = forceSync;
162 }
163
164 public void setSync(Sync sync) {
165 _sync = sync;
166 }
167
168 private final List<String> _absentDestinationNames =
169 new ArrayList<String>();
170 private final List<Destination> _asyncServiceDestinations =
171 new ArrayList<Destination>();
172 private boolean _forceSync;
173 private Sync _sync;
174
175 }
176
177 private static final TransactionAttribute _transactionAttribute;
178
179 static {
180 TransactionAttribute.Builder builder =
181 new TransactionAttribute.Builder();
182
183 builder.setPropagation(Propagation.NOT_SUPPORTED);
184 builder.setRollbackForClasses(
185 PortalException.class, SystemException.class);
186
187 _transactionAttribute = builder.build();
188 }
189
190 private static class CleanTransactionSynchronousDestination
191 extends SynchronousDestination {
192
193 @Override
194 public void send(final Message message) {
195 try {
196 TransactionInvokerUtil.invoke(
197 _transactionAttribute, new Callable<Void>() {
198
199 @Override
200 public Void call() throws Exception {
201 CleanTransactionSynchronousDestination.super.send(
202 message);
203
204 return null;
205 }
206 });
207 }
208 catch (Throwable t) {
209 throw new RuntimeException(t);
210 }
211 }
212
213 }
214
215 }