001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.search;
016    
017    import com.liferay.portal.kernel.concurrent.CallerRunsPolicy;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.messaging.Destination;
023    import com.liferay.portal.kernel.messaging.DestinationConfiguration;
024    import com.liferay.portal.kernel.messaging.DestinationFactory;
025    import com.liferay.portal.kernel.messaging.DestinationFactoryUtil;
026    import com.liferay.portal.kernel.messaging.InvokerMessageListener;
027    import com.liferay.portal.kernel.messaging.MessageBus;
028    import com.liferay.portal.kernel.messaging.MessageListener;
029    import com.liferay.portal.kernel.search.messaging.BaseSearchEngineMessageListener;
030    import com.liferay.portal.kernel.search.messaging.SearchReaderMessageListener;
031    import com.liferay.portal.kernel.search.messaging.SearchWriterMessageListener;
032    import com.liferay.portal.kernel.util.GetterUtil;
033    import com.liferay.portal.kernel.util.PortalRunMode;
034    import com.liferay.portal.kernel.util.PropsKeys;
035    import com.liferay.portal.kernel.util.PropsUtil;
036    import com.liferay.portal.kernel.util.StringBundler;
037    import com.liferay.portal.kernel.util.Validator;
038    import com.liferay.registry.Registry;
039    import com.liferay.registry.RegistryUtil;
040    import com.liferay.registry.dependency.ServiceDependencyListener;
041    import com.liferay.registry.dependency.ServiceDependencyManager;
042    
043    import java.util.ArrayList;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Map.Entry;
047    import java.util.Set;
048    
049    /**
050     * @author Michael C. Han
051     */
052    public abstract class AbstractSearchEngineConfigurator
053            implements SearchEngineConfigurator {
054    
055            @Override
056            public void afterPropertiesSet() {
057                    final ServiceDependencyManager serviceDependencyManager =
058                            new ServiceDependencyManager();
059    
060                    serviceDependencyManager.addServiceDependencyListener(
061                            new ServiceDependencyListener() {
062    
063                                    @Override
064                                    public void dependenciesFulfilled() {
065                                            Registry registry = RegistryUtil.getRegistry();
066    
067                                            _messageBus = registry.getService(MessageBus.class);
068    
069                                            initialize();
070                                    }
071    
072                                    @Override
073                                    public void destroy() {
074                                    }
075    
076                            });
077    
078                    serviceDependencyManager.registerDependencies(
079                            DestinationFactory.class, MessageBus.class);
080            }
081    
082            @Override
083            public void destroy() {
084                    for (SearchEngineRegistration searchEngineRegistration :
085                                    _searchEngineRegistrations) {
086    
087                            destroySearchEngine(searchEngineRegistration);
088                    }
089    
090                    _searchEngineRegistrations.clear();
091    
092                    if (Validator.isNotNull(_originalSearchEngineId)) {
093                            SearchEngineHelperUtil.setDefaultSearchEngineId(
094                                    _originalSearchEngineId);
095    
096                            _originalSearchEngineId = null;
097                    }
098            }
099    
100            @Override
101            public void setSearchEngines(Map<String, SearchEngine> searchEngines) {
102                    _searchEngines = searchEngines;
103            }
104    
105            protected void createSearchEngineListeners(
106                    String searchEngineId, SearchEngine searchEngine,
107                    Destination searchReaderDestination,
108                    Destination searchWriterDestination) {
109    
110                    registerSearchEngineMessageListener(
111                            searchEngineId, searchEngine, searchReaderDestination,
112                            new SearchReaderMessageListener(), searchEngine.getIndexSearcher());
113    
114                    registerSearchEngineMessageListener(
115                            searchEngineId, searchEngine, searchWriterDestination,
116                            new SearchWriterMessageListener(), searchEngine.getIndexWriter());
117            }
118    
119            protected Destination createSearchReaderDestination(
120                    String searchReaderDestinationName) {
121    
122                    DestinationConfiguration destinationConfiguration =
123                            DestinationConfiguration.createSynchronousDestinationConfiguration(
124                                    searchReaderDestinationName);
125    
126                    return DestinationFactoryUtil.createDestination(
127                            destinationConfiguration);
128            }
129    
130            protected Destination createSearchWriterDestination(
131                    String searchWriterDestinationName) {
132    
133                    DestinationConfiguration destinationConfiguration = null;
134    
135                    if (PortalRunMode.isTestMode()) {
136                            destinationConfiguration =
137                                    DestinationConfiguration.
138                                            createSynchronousDestinationConfiguration(
139                                                    searchWriterDestinationName);
140                    }
141                    else {
142                            destinationConfiguration =
143                                    DestinationConfiguration.createParallelDestinationConfiguration(
144                                            searchWriterDestinationName);
145                    }
146    
147                    if (_INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE > 0) {
148                            destinationConfiguration.setMaximumQueueSize(
149                                    _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE);
150    
151                            RejectedExecutionHandler rejectedExecutionHandler =
152                                    new CallerRunsPolicy() {
153    
154                                            @Override
155                                            public void rejectedExecution(
156                                                    Runnable runnable,
157                                                    ThreadPoolExecutor threadPoolExecutor) {
158    
159                                                    if (_log.isWarnEnabled()) {
160                                                            StringBundler sb = new StringBundler(4);
161    
162                                                            sb.append("The search index writer's task queue ");
163                                                            sb.append("is at its maximum capacity. The ");
164                                                            sb.append("current thread will handle the ");
165                                                            sb.append("request.");
166    
167                                                            _log.warn(sb.toString());
168                                                    }
169    
170                                                    super.rejectedExecution(runnable, threadPoolExecutor);
171                                            }
172    
173                                    };
174    
175                            destinationConfiguration.setRejectedExecutionHandler(
176                                    rejectedExecutionHandler);
177                    }
178    
179                    return DestinationFactoryUtil.createDestination(
180                            destinationConfiguration);
181            }
182    
183            protected void destroySearchEngine(
184                    SearchEngineRegistration searchEngineRegistration) {
185    
186                    _messageBus.removeDestination(
187                            searchEngineRegistration.getSearchReaderDestinationName());
188    
189                    _messageBus.removeDestination(
190                            searchEngineRegistration.getSearchWriterDestinationName());
191    
192                    SearchEngineHelperUtil.removeSearchEngine(
193                            searchEngineRegistration.getSearchEngineId());
194    
195                    if (!searchEngineRegistration.isOverride()) {
196                            return;
197                    }
198    
199                    SearchEngineProxyWrapper originalSearchEngineProxy =
200                            searchEngineRegistration.getOriginalSearchEngineProxyWrapper();
201    
202                    Destination searchReaderDestination = getSearchReaderDestination(
203                            _messageBus, searchEngineRegistration.getSearchEngineId());
204    
205                    registerInvokerMessageListener(
206                            searchReaderDestination,
207                            searchEngineRegistration.getOriginalSearchReaderMessageListeners());
208    
209                    Destination searchWriterDestination = getSearchWriterDestination(
210                            _messageBus, searchEngineRegistration.getSearchEngineId());
211    
212                    registerInvokerMessageListener(
213                            searchWriterDestination,
214                            searchEngineRegistration.getOriginalSearchWriterMessageListeners());
215    
216                    SearchEngineHelperUtil.setSearchEngine(
217                            searchEngineRegistration.getSearchEngineId(),
218                            originalSearchEngineProxy);
219            }
220    
221            protected abstract String getDefaultSearchEngineId();
222    
223            protected abstract IndexSearcher getIndexSearcher();
224    
225            protected abstract IndexWriter getIndexWriter();
226    
227            protected abstract ClassLoader getOperatingClassloader();
228    
229            protected Destination getSearchReaderDestination(
230                    MessageBus messageBus, String searchEngineId) {
231    
232                    String searchReaderDestinationName =
233                            SearchEngineHelperUtil.getSearchReaderDestinationName(
234                                    searchEngineId);
235    
236                    Destination searchReaderDestination = messageBus.getDestination(
237                            searchReaderDestinationName);
238    
239                    if (searchReaderDestination == null) {
240                            searchReaderDestination = createSearchReaderDestination(
241                                    searchReaderDestinationName);
242    
243                            messageBus.addDestination(searchReaderDestination);
244                    }
245    
246                    return searchReaderDestination;
247            }
248    
249            protected Destination getSearchWriterDestination(
250                    MessageBus messageBus, String searchEngineId) {
251    
252                    String searchWriterDestinationName =
253                            SearchEngineHelperUtil.getSearchWriterDestinationName(
254                                    searchEngineId);
255    
256                    Destination searchWriterDestination = messageBus.getDestination(
257                            searchWriterDestinationName);
258    
259                    if (searchWriterDestination == null) {
260                            searchWriterDestination = createSearchWriterDestination(
261                                    searchWriterDestinationName);
262    
263                            messageBus.addDestination(searchWriterDestination);
264                    }
265    
266                    return searchWriterDestination;
267            }
268    
269            protected void initialize() {
270                    Set<Entry<String, SearchEngine>> entrySet = _searchEngines.entrySet();
271    
272                    for (Entry<String, SearchEngine> entry : entrySet) {
273                            initSearchEngine(entry.getKey(), entry.getValue());
274                    }
275    
276                    String defaultSearchEngineId = getDefaultSearchEngineId();
277    
278                    if (Validator.isNotNull(defaultSearchEngineId)) {
279                            _originalSearchEngineId =
280                                    SearchEngineHelperUtil.getDefaultSearchEngineId();
281    
282                            SearchEngineHelperUtil.setDefaultSearchEngineId(
283                                    defaultSearchEngineId);
284                    }
285    
286                    _searchEngines.clear();
287            }
288    
289            protected void initSearchEngine(
290                    String searchEngineId, SearchEngine searchEngine) {
291    
292                    SearchEngineRegistration searchEngineRegistration =
293                            new SearchEngineRegistration(searchEngineId);
294    
295                    _searchEngineRegistrations.add(searchEngineRegistration);
296    
297                    Destination searchReaderDestination = getSearchReaderDestination(
298                            _messageBus, searchEngineId);
299    
300                    searchEngineRegistration.setSearchReaderDestinationName(
301                            searchReaderDestination.getName());
302    
303                    Destination searchWriterDestination = getSearchWriterDestination(
304                            _messageBus, searchEngineId);
305    
306                    searchEngineRegistration.setSearchWriterDestinationName(
307                            searchWriterDestination.getName());
308    
309                    SearchEngine originalSearchEngine =
310                            SearchEngineHelperUtil.getSearchEngineSilent(searchEngineId);
311    
312                    if (originalSearchEngine != null) {
313                            searchEngineRegistration.setOverride(true);
314    
315                            searchEngineRegistration.setOriginalSearchEngineProxyWrapper(
316                                    (SearchEngineProxyWrapper)originalSearchEngine);
317    
318                            savePreviousSearchEngineListeners(
319                                    searchReaderDestination, searchWriterDestination,
320                                    searchEngineRegistration);
321    
322                            _messageBus.removeDestination(
323                                    searchReaderDestination.getName(), false);
324    
325                            searchReaderDestination = getSearchReaderDestination(
326                                    _messageBus, searchEngineId);
327    
328                            _messageBus.removeDestination(
329                                    searchWriterDestination.getName(), false);
330    
331                            searchWriterDestination = getSearchWriterDestination(
332                                    _messageBus, searchEngineId);
333                    }
334    
335                    createSearchEngineListeners(
336                            searchEngineId, searchEngine, searchReaderDestination,
337                            searchWriterDestination);
338    
339                    SearchEngineProxyWrapper searchEngineProxyWrapper =
340                            new SearchEngineProxyWrapper(
341                                    searchEngine, getIndexSearcher(), getIndexWriter());
342    
343                    SearchEngineHelperUtil.setSearchEngine(
344                            searchEngineId, searchEngineProxyWrapper);
345            }
346    
347            protected void registerInvokerMessageListener(
348                    Destination destination,
349                    List<InvokerMessageListener> invokerMessageListeners) {
350    
351                    for (InvokerMessageListener invokerMessageListener :
352                                    invokerMessageListeners) {
353    
354                            destination.register(
355                                    invokerMessageListener.getMessageListener(),
356                                    invokerMessageListener.getClassLoader());
357                    }
358            }
359    
360            protected void registerSearchEngineMessageListener(
361                    String searchEngineId, SearchEngine searchEngine,
362                    Destination destination,
363                    BaseSearchEngineMessageListener baseSearchEngineMessageListener,
364                    Object manager) {
365    
366                    baseSearchEngineMessageListener.setManager(manager);
367                    baseSearchEngineMessageListener.setMessageBus(_messageBus);
368                    baseSearchEngineMessageListener.setSearchEngine(searchEngine);
369                    baseSearchEngineMessageListener.setSearchEngineId(searchEngineId);
370    
371                    destination.register(
372                            baseSearchEngineMessageListener, getOperatingClassloader());
373            }
374    
375            protected void savePreviousSearchEngineListeners(
376                    Destination searchReaderDestination,
377                    Destination searchWriterDestination,
378                    SearchEngineRegistration searchEngineRegistration) {
379    
380                    Set<MessageListener> searchReaderMessageListeners =
381                            searchReaderDestination.getMessageListeners();
382    
383                    for (MessageListener searchReaderMessageListener :
384                                    searchReaderMessageListeners) {
385    
386                            InvokerMessageListener invokerMessageListener =
387                                    (InvokerMessageListener)searchReaderMessageListener;
388    
389                            searchEngineRegistration.addOriginalSearchReaderMessageListener(
390                                    invokerMessageListener);
391                    }
392    
393                    Set<MessageListener> searchWriterMessageListeners =
394                            searchWriterDestination.getMessageListeners();
395    
396                    for (MessageListener searchWriterMessageListener :
397                                    searchWriterMessageListeners) {
398    
399                            InvokerMessageListener invokerMessageListener =
400                                    (InvokerMessageListener)searchWriterMessageListener;
401    
402                            searchEngineRegistration.addOriginalSearchWriterMessageListener(
403                                    invokerMessageListener);
404                    }
405            }
406    
407            private static final int _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE =
408                    GetterUtil.getInteger(
409                            PropsUtil.get(PropsKeys.INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE));
410    
411            private static final Log _log = LogFactoryUtil.getLog(
412                    AbstractSearchEngineConfigurator.class);
413    
414            private volatile MessageBus _messageBus;
415            private String _originalSearchEngineId;
416            private final List<SearchEngineRegistration> _searchEngineRegistrations =
417                    new ArrayList<>();
418            private Map<String, SearchEngine> _searchEngines;
419    
420            private class SearchEngineRegistration {
421    
422                    public void addOriginalSearchReaderMessageListener(
423                            InvokerMessageListener messageListener) {
424    
425                            _originalSearchReaderMessageListeners.add(messageListener);
426                    }
427    
428                    public void addOriginalSearchWriterMessageListener(
429                            InvokerMessageListener messageListener) {
430    
431                            _originalSearchWriterMessageListeners.add(messageListener);
432                    }
433    
434                    public SearchEngineProxyWrapper getOriginalSearchEngineProxyWrapper() {
435                            return _originalSearchEngineProxyWrapper;
436                    }
437    
438                    public List<InvokerMessageListener>
439                            getOriginalSearchReaderMessageListeners() {
440    
441                            return _originalSearchReaderMessageListeners;
442                    }
443    
444                    public List<InvokerMessageListener>
445                            getOriginalSearchWriterMessageListeners() {
446    
447                            return _originalSearchWriterMessageListeners;
448                    }
449    
450                    public String getSearchEngineId() {
451                            return _searchEngineId;
452                    }
453    
454                    public String getSearchReaderDestinationName() {
455                            return _searchReaderDestinationName;
456                    }
457    
458                    public String getSearchWriterDestinationName() {
459                            return _searchWriterDestinationName;
460                    }
461    
462                    public boolean isOverride() {
463                            return _override;
464                    }
465    
466                    public void setOriginalSearchEngineProxyWrapper(
467                            SearchEngineProxyWrapper searchEngineProxyWrapper) {
468    
469                            _originalSearchEngineProxyWrapper = searchEngineProxyWrapper;
470                    }
471    
472                    public void setOverride(boolean override) {
473                            _override = override;
474                    }
475    
476                    public void setSearchReaderDestinationName(
477                            String searchReaderDestinationName) {
478    
479                            _searchReaderDestinationName = searchReaderDestinationName;
480                    }
481    
482                    public void setSearchWriterDestinationName(
483                            String searchWriterDestinationName) {
484    
485                            _searchWriterDestinationName = searchWriterDestinationName;
486                    }
487    
488                    private SearchEngineRegistration(String searchEngineId) {
489                            _searchEngineId = searchEngineId;
490                    }
491    
492                    private SearchEngineProxyWrapper _originalSearchEngineProxyWrapper;
493                    private final List<InvokerMessageListener>
494                            _originalSearchReaderMessageListeners = new ArrayList<>();
495                    private final List<InvokerMessageListener>
496                            _originalSearchWriterMessageListeners = new ArrayList<>();
497                    private boolean _override;
498                    private final String _searchEngineId;
499                    private String _searchReaderDestinationName;
500                    private String _searchWriterDestinationName;
501    
502            }
503    
504    }