001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.test.rule.callback;
016    
017    import com.liferay.portal.kernel.exception.PortalException;
018    import com.liferay.portal.kernel.exception.SystemException;
019    import com.liferay.portal.kernel.messaging.BaseAsyncDestination;
020    import com.liferay.portal.kernel.messaging.BaseDestination;
021    import com.liferay.portal.kernel.messaging.Destination;
022    import com.liferay.portal.kernel.messaging.DestinationNames;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.messaging.MessageBus;
025    import com.liferay.portal.kernel.messaging.MessageBusUtil;
026    import com.liferay.portal.kernel.messaging.SynchronousDestination;
027    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
028    import com.liferay.portal.kernel.search.SearchEngineHelperUtil;
029    import com.liferay.portal.kernel.test.rule.Sync;
030    import com.liferay.portal.kernel.test.rule.SynchronousDestinationTestRule;
031    import com.liferay.portal.kernel.test.rule.callback.SynchronousDestinationTestCallback.SyncHandler;
032    import com.liferay.portal.kernel.transaction.Propagation;
033    import com.liferay.portal.kernel.transaction.TransactionAttribute;
034    import com.liferay.portal.kernel.transaction.TransactionInvokerUtil;
035    import com.liferay.registry.Filter;
036    import com.liferay.registry.Registry;
037    import com.liferay.registry.RegistryUtil;
038    import com.liferay.registry.dependency.ServiceDependencyManager;
039    
040    import java.lang.reflect.Method;
041    
042    import java.util.ArrayList;
043    import java.util.List;
044    import java.util.concurrent.Callable;
045    
046    import org.junit.Test;
047    import org.junit.runner.Description;
048    
049    /**
050     * @author Shuyang Zhou
051     */
052    public class SynchronousDestinationTestCallback
053            implements TestCallback<SyncHandler, SyncHandler> {
054    
055            public static final SynchronousDestinationTestCallback INSTANCE =
056                    new SynchronousDestinationTestCallback();
057    
058            @Override
059            public void afterClass(Description description, SyncHandler syncHandler)
060                    throws Exception {
061    
062                    if (syncHandler != null) {
063                            syncHandler.restorePreviousSync();
064                    }
065            }
066    
067            @Override
068            public void afterMethod(
069                    Description description, SyncHandler syncHandler, Object target) {
070    
071                    if (syncHandler != null) {
072                            syncHandler.restorePreviousSync();
073                    }
074            }
075    
076            @Override
077            public SyncHandler beforeClass(Description description) throws Throwable {
078                    Class<?> testClass = description.getTestClass();
079    
080                    Sync sync = testClass.getAnnotation(Sync.class);
081    
082                    if (sync != null) {
083                            return _createSyncHandler(sync);
084                    }
085    
086                    boolean hasSyncedMethod = false;
087    
088                    for (Method method : testClass.getMethods()) {
089                            if ((method.getAnnotation(Sync.class) != null) &&
090                                    (method.getAnnotation(Test.class) != null)) {
091    
092                                    hasSyncedMethod = true;
093    
094                                    break;
095                            }
096                    }
097    
098                    if (!hasSyncedMethod) {
099                            throw new AssertionError(
100                                    testClass.getName() + " uses " +
101                                            SynchronousDestinationTestRule.class.getName() +
102                                                    " without any usage of " + Sync.class.getName());
103                    }
104    
105                    return null;
106            }
107    
108            @Override
109            public SyncHandler beforeMethod(Description description, Object target) {
110                    Class<?> testClass = description.getTestClass();
111    
112                    Sync sync = testClass.getAnnotation(Sync.class);
113    
114                    if (sync != null) {
115                            return null;
116                    }
117    
118                    sync = description.getAnnotation(Sync.class);
119    
120                    if (sync == null) {
121                            return null;
122                    }
123    
124                    return _createSyncHandler(sync);
125            }
126    
127            public static class SyncHandler {
128    
129                    public BaseDestination createSynchronousDestination(
130                            String destinationName) {
131    
132                            SynchronousDestination synchronousDestination = null;
133    
134                            if ((_sync != null) && _sync.cleanTransaction()) {
135                                    synchronousDestination =
136                                            new CleanTransactionSynchronousDestination();
137                            }
138                            else {
139                                    synchronousDestination = new SynchronousDestination();
140                            }
141    
142                            synchronousDestination.setName(destinationName);
143    
144                            return synchronousDestination;
145                    }
146    
147                    public void enableSync() {
148                            if (_sync == null) {
149                                    return;
150                            }
151    
152                            ServiceDependencyManager serviceDependencyManager =
153                                    new ServiceDependencyManager();
154    
155                            Filter asyncFilter = _registerDestinationFilter(
156                                    DestinationNames.ASYNC_SERVICE);
157                            Filter backgroundTaskFilter = _registerDestinationFilter(
158                                    DestinationNames.BACKGROUND_TASK);
159                            Filter backgroundTaskStatusFilter = _registerDestinationFilter(
160                                    DestinationNames.BACKGROUND_TASK_STATUS);
161                            Filter mailFilter = _registerDestinationFilter(
162                                    DestinationNames.MAIL);
163                            Filter pdfProcessorFilter = _registerDestinationFilter(
164                                    DestinationNames.DOCUMENT_LIBRARY_PDF_PROCESSOR);
165                            Filter rawMetaDataProcessorFilter = _registerDestinationFilter(
166                                    DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
167                            Filter subscrpitionSenderFilter = _registerDestinationFilter(
168                                    DestinationNames.SUBSCRIPTION_SENDER);
169    
170                            serviceDependencyManager.registerDependencies(
171                                    asyncFilter, backgroundTaskFilter, backgroundTaskStatusFilter,
172                                    mailFilter, pdfProcessorFilter, rawMetaDataProcessorFilter,
173                                    subscrpitionSenderFilter);
174    
175                            serviceDependencyManager.waitForDependencies();
176    
177                            ProxyModeThreadLocal.setForceSync(true);
178    
179                            replaceDestination(DestinationNames.ASYNC_SERVICE);
180                            replaceDestination(DestinationNames.BACKGROUND_TASK);
181                            replaceDestination(DestinationNames.BACKGROUND_TASK_STATUS);
182                            replaceDestination(DestinationNames.DOCUMENT_LIBRARY_PDF_PROCESSOR);
183                            replaceDestination(
184                                    DestinationNames.DOCUMENT_LIBRARY_RAW_METADATA_PROCESSOR);
185                            replaceDestination(
186                                    DestinationNames.DOCUMENT_LIBRARY_SYNC_EVENT_PROCESSOR);
187                            replaceDestination(DestinationNames.MAIL);
188                            replaceDestination(DestinationNames.SCHEDULER_ENGINE);
189                            replaceDestination(DestinationNames.SUBSCRIPTION_SENDER);
190    
191                            for (String searchEngineId :
192                                            SearchEngineHelperUtil.getSearchEngineIds()) {
193    
194                                    replaceDestination(
195                                            SearchEngineHelperUtil.getSearchReaderDestinationName(
196                                                    searchEngineId));
197                                    replaceDestination(
198                                            SearchEngineHelperUtil.getSearchWriterDestinationName(
199                                                    searchEngineId));
200                            }
201                    }
202    
203                    public void replaceDestination(String destinationName) {
204                            MessageBus messageBus = MessageBusUtil.getMessageBus();
205    
206                            Destination destination = messageBus.getDestination(
207                                    destinationName);
208    
209                            if (destination instanceof BaseAsyncDestination) {
210                                    _asyncServiceDestinations.add(destination);
211    
212                                    messageBus.replace(
213                                            createSynchronousDestination(destinationName), false);
214                            }
215    
216                            if (destination == null) {
217                                    _absentDestinationNames.add(destinationName);
218    
219                                    messageBus.addDestination(
220                                            createSynchronousDestination(destinationName));
221                            }
222                    }
223    
224                    public void restorePreviousSync() {
225                            if (_sync == null) {
226                                    return;
227                            }
228    
229                            ProxyModeThreadLocal.setForceSync(_forceSync);
230    
231                            MessageBus messageBus = MessageBusUtil.getMessageBus();
232    
233                            for (Destination destination : _asyncServiceDestinations) {
234                                    messageBus.replace(destination);
235                            }
236    
237                            _asyncServiceDestinations.clear();
238    
239                            for (String absentDestinationName : _absentDestinationNames) {
240                                    messageBus.removeDestination(absentDestinationName);
241                            }
242                    }
243    
244                    public void setForceSync(boolean forceSync) {
245                            _forceSync = forceSync;
246                    }
247    
248                    public void setSync(Sync sync) {
249                            _sync = sync;
250                    }
251    
252                    private Filter _registerDestinationFilter(String destinationName) {
253                            Registry registry = RegistryUtil.getRegistry();
254    
255                            return registry.getFilter(
256                                    "(&(destination.name=" + destinationName +
257                                            ")(objectClass=" + Destination.class.getName() + "))");
258                    }
259    
260                    private final List<String> _absentDestinationNames = new ArrayList<>();
261                    private final List<Destination> _asyncServiceDestinations =
262                            new ArrayList<>();
263                    private boolean _forceSync;
264                    private Sync _sync;
265    
266            }
267    
268            protected SynchronousDestinationTestCallback() {
269            }
270    
271            private SyncHandler _createSyncHandler(Sync sync) {
272                    SyncHandler syncHandler = new SyncHandler();
273    
274                    syncHandler.setForceSync(ProxyModeThreadLocal.isForceSync());
275                    syncHandler.setSync(sync);
276    
277                    syncHandler.enableSync();
278    
279                    return syncHandler;
280            }
281    
282            private static final TransactionAttribute _transactionAttribute;
283    
284            static {
285                    TransactionAttribute.Builder builder =
286                            new TransactionAttribute.Builder();
287    
288                    builder.setPropagation(Propagation.NOT_SUPPORTED);
289                    builder.setRollbackForClasses(
290                            PortalException.class, SystemException.class);
291    
292                    _transactionAttribute = builder.build();
293            }
294    
295            private static class CleanTransactionSynchronousDestination
296                    extends SynchronousDestination {
297    
298                    @Override
299                    public void send(final Message message) {
300                            try {
301                                    TransactionInvokerUtil.invoke(
302                                            _transactionAttribute,
303                                            new Callable<Void>() {
304    
305                                                    @Override
306                                                    public Void call() throws Exception {
307                                                            CleanTransactionSynchronousDestination.super.send(
308                                                                    message);
309    
310                                                            return null;
311                                                    }
312    
313                                            });
314                            }
315                            catch (Throwable t) {
316                                    throw new RuntimeException(t);
317                            }
318                    }
319    
320            }
321    
322    }