001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.test.rule.callback;
016    
017    import com.liferay.portal.kernel.exception.PortalException;
018    import com.liferay.portal.kernel.exception.SystemException;
019    import com.liferay.portal.kernel.messaging.BaseAsyncDestination;
020    import com.liferay.portal.kernel.messaging.BaseDestination;
021    import com.liferay.portal.kernel.messaging.Destination;
022    import com.liferay.portal.kernel.messaging.DestinationNames;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.messaging.MessageBus;
025    import com.liferay.portal.kernel.messaging.MessageBusUtil;
026    import com.liferay.portal.kernel.messaging.SynchronousDestination;
027    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
028    import com.liferay.portal.kernel.test.rule.Sync;
029    import com.liferay.portal.kernel.test.rule.SynchronousDestinationTestRule;
030    import com.liferay.portal.kernel.test.rule.callback.SynchronousDestinationTestCallback.SyncHandler;
031    import com.liferay.portal.kernel.transaction.Propagation;
032    import com.liferay.portal.kernel.transaction.TransactionAttribute;
033    import com.liferay.portal.kernel.transaction.TransactionInvokerUtil;
034    import com.liferay.registry.Filter;
035    import com.liferay.registry.Registry;
036    import com.liferay.registry.RegistryUtil;
037    import com.liferay.registry.dependency.ServiceDependencyManager;
038    
039    import java.lang.reflect.Method;
040    
041    import java.util.ArrayList;
042    import java.util.List;
043    import java.util.concurrent.Callable;
044    
045    import org.junit.Test;
046    import org.junit.runner.Description;
047    
048    /**
049     * @author Shuyang Zhou
050     */
051    public class SynchronousDestinationTestCallback
052            implements TestCallback<SyncHandler, SyncHandler> {
053    
054            public static final SynchronousDestinationTestCallback INSTANCE =
055                    new SynchronousDestinationTestCallback();
056    
057            @Override
058            public void afterClass(Description description, SyncHandler syncHandler)
059                    throws Exception {
060    
061                    if (syncHandler != null) {
062                            syncHandler.restorePreviousSync();
063                    }
064            }
065    
066            @Override
067            public void afterMethod(
068                    Description description, SyncHandler syncHandler, Object target) {
069    
070                    if (syncHandler != null) {
071                            syncHandler.restorePreviousSync();
072                    }
073            }
074    
075            @Override
076            public SyncHandler beforeClass(Description description) throws Throwable {
077                    Class<?> testClass = description.getTestClass();
078    
079                    Sync sync = testClass.getAnnotation(Sync.class);
080    
081                    if (sync != null) {
082                            return _createSyncHandler(sync);
083                    }
084    
085                    boolean hasSyncedMethod = false;
086    
087                    for (Method method : testClass.getMethods()) {
088                            if ((method.getAnnotation(Sync.class) != null) &&
089                                    (method.getAnnotation(Test.class) != null)) {
090    
091                                    hasSyncedMethod = true;
092    
093                                    break;
094                            }
095                    }
096    
097                    if (!hasSyncedMethod) {
098                            throw new AssertionError(
099                                    testClass.getName() + " uses " +
100                                            SynchronousDestinationTestRule.class.getName() +
101                                                    " without any usage of " + Sync.class.getName());
102                    }
103    
104                    return null;
105            }
106    
107            @Override
108            public SyncHandler beforeMethod(Description description, Object target) {
109                    Class<?> testClass = description.getTestClass();
110    
111                    Sync sync = testClass.getAnnotation(Sync.class);
112    
113                    if (sync != null) {
114                            return null;
115                    }
116    
117                    sync = description.getAnnotation(Sync.class);
118    
119                    if (sync == null) {
120                            return null;
121                    }
122    
123                    return _createSyncHandler(sync);
124            }
125    
126            public static class SyncHandler {
127    
128                    public BaseDestination createSynchronousDestination(
129                            String destinationName) {
130    
131                            SynchronousDestination synchronousDestination = null;
132    
133                            if ((_sync != null) && _sync.cleanTransaction()) {
134                                    synchronousDestination =
135                                            new CleanTransactionSynchronousDestination();
136                            }
137                            else {
138                                    synchronousDestination = new SynchronousDestination();
139                            }
140    
141                            synchronousDestination.setName(destinationName);
142    
143                            return synchronousDestination;
144                    }
145    
146                    public void enableSync() {
147                            if (_sync == null) {
148                                    return;
149                            }
150    
151                            ServiceDependencyManager serviceDependencyManager =
152                                    new ServiceDependencyManager();
153    
154                            Filter asyncFilter = _registerDestinationFilter(
155                                    DestinationNames.ASYNC_SERVICE);
156                            Filter backgroundTaskFilter = _registerDestinationFilter(
157                                    DestinationNames.BACKGROUND_TASK);
158                            Filter backgroundTaskStatusFilter = _registerDestinationFilter(
159                                    DestinationNames.BACKGROUND_TASK_STATUS);
160                            Filter mailFilter = _registerDestinationFilter(
161                                    DestinationNames.MAIL);
162                            Filter pdfProcessorFilter = _registerDestinationFilter(
163                                    DestinationNames.DOCUMENT_LIBRARY_PDF_PROCESSOR);
164                            Filter rawMetaDataProcessorFilter = _registerDestinationFilter(
165                                    DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
166                            Filter subscrpitionSenderFilter = _registerDestinationFilter(
167                                    DestinationNames.SUBSCRIPTION_SENDER);
168    
169                            serviceDependencyManager.registerDependencies(
170                                    asyncFilter, backgroundTaskFilter, backgroundTaskStatusFilter,
171                                    mailFilter, pdfProcessorFilter, rawMetaDataProcessorFilter,
172                                    subscrpitionSenderFilter);
173    
174                            serviceDependencyManager.waitForDependencies();
175    
176                            ProxyModeThreadLocal.setForceSync(true);
177    
178                            replaceDestination(DestinationNames.ASYNC_SERVICE);
179                            replaceDestination(DestinationNames.BACKGROUND_TASK);
180                            replaceDestination(DestinationNames.BACKGROUND_TASK_STATUS);
181                            replaceDestination(DestinationNames.DOCUMENT_LIBRARY_PDF_PROCESSOR);
182                            replaceDestination(
183                                    DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
184                            replaceDestination(
185                                    DestinationNames.DOCUMENT_LIBRARY_SYNC_EVENT_PROCESSOR);
186                            replaceDestination(DestinationNames.MAIL);
187                            replaceDestination(DestinationNames.SCHEDULER_ENGINE);
188                            replaceDestination(DestinationNames.SEARCH_READER);
189                            replaceDestination(DestinationNames.SEARCH_WRITER);
190                            replaceDestination(DestinationNames.SUBSCRIPTION_SENDER);
191                    }
192    
193                    public void replaceDestination(String destinationName) {
194                            MessageBus messageBus = MessageBusUtil.getMessageBus();
195    
196                            Destination destination = messageBus.getDestination(
197                                    destinationName);
198    
199                            if (destination instanceof BaseAsyncDestination) {
200                                    _asyncServiceDestinations.add(destination);
201    
202                                    messageBus.replace(
203                                            createSynchronousDestination(destinationName), false);
204                            }
205    
206                            if (destination == null) {
207                                    _absentDestinationNames.add(destinationName);
208    
209                                    messageBus.addDestination(
210                                            createSynchronousDestination(destinationName));
211                            }
212                    }
213    
214                    public void restorePreviousSync() {
215                            if (_sync == null) {
216                                    return;
217                            }
218    
219                            ProxyModeThreadLocal.setForceSync(_forceSync);
220    
221                            MessageBus messageBus = MessageBusUtil.getMessageBus();
222    
223                            for (Destination destination : _asyncServiceDestinations) {
224                                    messageBus.replace(destination);
225                            }
226    
227                            _asyncServiceDestinations.clear();
228    
229                            for (String absentDestinationName : _absentDestinationNames) {
230                                    messageBus.removeDestination(absentDestinationName);
231                            }
232                    }
233    
234                    public void setForceSync(boolean forceSync) {
235                            _forceSync = forceSync;
236                    }
237    
238                    public void setSync(Sync sync) {
239                            _sync = sync;
240                    }
241    
242                    private Filter _registerDestinationFilter(String destinationName) {
243                            Registry registry = RegistryUtil.getRegistry();
244    
245                            return registry.getFilter(
246                                    "(&(destination.name=" + destinationName +
247                                            ")(objectClass=" + Destination.class.getName() + "))");
248                    }
249    
250                    private final List<String> _absentDestinationNames = new ArrayList<>();
251                    private final List<Destination> _asyncServiceDestinations =
252                            new ArrayList<>();
253                    private boolean _forceSync;
254                    private Sync _sync;
255    
256            }
257    
258            protected SynchronousDestinationTestCallback() {
259            }
260    
261            private SyncHandler _createSyncHandler(Sync sync) {
262                    SyncHandler syncHandler = new SyncHandler();
263    
264                    syncHandler.setForceSync(ProxyModeThreadLocal.isForceSync());
265                    syncHandler.setSync(sync);
266    
267                    syncHandler.enableSync();
268    
269                    return syncHandler;
270            }
271    
272            private static final TransactionAttribute _transactionAttribute;
273    
274            static {
275                    TransactionAttribute.Builder builder =
276                            new TransactionAttribute.Builder();
277    
278                    builder.setPropagation(Propagation.NOT_SUPPORTED);
279                    builder.setRollbackForClasses(
280                            PortalException.class, SystemException.class);
281    
282                    _transactionAttribute = builder.build();
283            }
284    
285            private static class CleanTransactionSynchronousDestination
286                    extends SynchronousDestination {
287    
288                    @Override
289                    public void send(final Message message) {
290                            try {
291                                    TransactionInvokerUtil.invoke(
292                                            _transactionAttribute, new Callable<Void>() {
293    
294                                            @Override
295                                            public Void call() throws Exception {
296                                                    CleanTransactionSynchronousDestination.super.send(
297                                                            message);
298    
299                                                    return null;
300                                            }
301                                    });
302                            }
303                            catch (Throwable t) {
304                                    throw new RuntimeException(t);
305                            }
306                    }
307    
308            }
309    
310    }