001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.search;
016    
017    import com.liferay.portal.kernel.concurrent.CallerRunsPolicy;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.messaging.Destination;
023    import com.liferay.portal.kernel.messaging.DestinationConfiguration;
024    import com.liferay.portal.kernel.messaging.DestinationFactory;
025    import com.liferay.portal.kernel.messaging.DestinationFactoryUtil;
026    import com.liferay.portal.kernel.messaging.InvokerMessageListener;
027    import com.liferay.portal.kernel.messaging.MessageBus;
028    import com.liferay.portal.kernel.messaging.MessageListener;
029    import com.liferay.portal.kernel.search.messaging.BaseSearchEngineMessageListener;
030    import com.liferay.portal.kernel.search.messaging.SearchReaderMessageListener;
031    import com.liferay.portal.kernel.search.messaging.SearchWriterMessageListener;
032    import com.liferay.portal.kernel.util.GetterUtil;
033    import com.liferay.portal.kernel.util.PortalRunMode;
034    import com.liferay.portal.kernel.util.PropsKeys;
035    import com.liferay.portal.kernel.util.PropsUtil;
036    import com.liferay.portal.kernel.util.StringBundler;
037    import com.liferay.portal.kernel.util.Validator;
038    import com.liferay.registry.Registry;
039    import com.liferay.registry.RegistryUtil;
040    import com.liferay.registry.dependency.ServiceDependencyListener;
041    import com.liferay.registry.dependency.ServiceDependencyManager;
042    
043    import java.util.ArrayList;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Map.Entry;
047    import java.util.Set;
048    
049    /**
050     * @author Michael C. Han
051     */
052    public abstract class AbstractSearchEngineConfigurator
053            implements SearchEngineConfigurator {
054    
055            @Override
056            public void afterPropertiesSet() {
057                    final ServiceDependencyManager serviceDependencyManager =
058                            new ServiceDependencyManager();
059    
060                    serviceDependencyManager.addServiceDependencyListener(
061                            new ServiceDependencyListener() {
062    
063                                    @Override
064                                    public void dependenciesFulfilled() {
065                                            Registry registry = RegistryUtil.getRegistry();
066    
067                                            _messageBus = registry.getService(MessageBus.class);
068    
069                                            initialize();
070                                    }
071    
072                                    @Override
073                                    public void destroy() {
074                                    }
075    
076                            });
077    
078                    serviceDependencyManager.registerDependencies(
079                            DestinationFactory.class, MessageBus.class);
080            }
081    
082            @Override
083            public void destroy() {
084                    for (SearchEngineRegistration searchEngineRegistration :
085                                    _searchEngineRegistrations) {
086    
087                            destroySearchEngine(searchEngineRegistration);
088                    }
089    
090                    _searchEngineRegistrations.clear();
091    
092                    if (Validator.isNotNull(_originalSearchEngineId)) {
093                            SearchEngineUtil.setDefaultSearchEngineId(_originalSearchEngineId);
094    
095                            _originalSearchEngineId = null;
096                    }
097            }
098    
099            @Override
100            public void setSearchEngines(Map<String, SearchEngine> searchEngines) {
101                    _searchEngines = searchEngines;
102            }
103    
104            protected void createSearchEngineListeners(
105                    String searchEngineId, SearchEngine searchEngine,
106                    Destination searchReaderDestination,
107                    Destination searchWriterDestination) {
108    
109                    registerSearchEngineMessageListener(
110                            searchEngineId, searchEngine, searchReaderDestination,
111                            new SearchReaderMessageListener(), searchEngine.getIndexSearcher());
112    
113                    registerSearchEngineMessageListener(
114                            searchEngineId, searchEngine, searchWriterDestination,
115                            new SearchWriterMessageListener(), searchEngine.getIndexWriter());
116            }
117    
118            protected Destination createSearchReaderDestination(
119                    String searchReaderDestinationName) {
120    
121                    DestinationConfiguration destinationConfiguration =
122                            DestinationConfiguration.createSynchronousDestinationConfiguration(
123                                    searchReaderDestinationName);
124    
125                    return DestinationFactoryUtil.createDestination(
126                            destinationConfiguration);
127            }
128    
129            protected Destination createSearchWriterDestination(
130                    String searchWriterDestinationName) {
131    
132                    DestinationConfiguration destinationConfiguration = null;
133    
134                    if (PortalRunMode.isTestMode()) {
135                            destinationConfiguration =
136                                    DestinationConfiguration.
137                                            createSynchronousDestinationConfiguration(
138                                                    searchWriterDestinationName);
139                    }
140                    else {
141                            destinationConfiguration =
142                                    DestinationConfiguration.createParallelDestinationConfiguration(
143                                            searchWriterDestinationName);
144                    }
145    
146                    if (_INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE > 0) {
147                            destinationConfiguration.setMaximumQueueSize(
148                                    _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE);
149    
150                            RejectedExecutionHandler rejectedExecutionHandler =
151                                    new CallerRunsPolicy() {
152    
153                                            @Override
154                                            public void rejectedExecution(
155                                                    Runnable runnable,
156                                                    ThreadPoolExecutor threadPoolExecutor) {
157    
158                                                    if (_log.isWarnEnabled()) {
159                                                            StringBundler sb = new StringBundler(4);
160    
161                                                            sb.append("The search index writer's task queue ");
162                                                            sb.append("is at its maximum capacity. The ");
163                                                            sb.append("current thread will handle the ");
164                                                            sb.append("request.");
165    
166                                                            _log.warn(sb.toString());
167                                                    }
168    
169                                                    super.rejectedExecution(runnable, threadPoolExecutor);
170                                            }
171    
172                                    };
173    
174                            destinationConfiguration.setRejectedExecutionHandler(
175                                    rejectedExecutionHandler);
176                    }
177    
178                    return DestinationFactoryUtil.createDestination(
179                            destinationConfiguration);
180            }
181    
182            protected void destroySearchEngine(
183                    SearchEngineRegistration searchEngineRegistration) {
184    
185                    _messageBus.removeDestination(
186                            searchEngineRegistration.getSearchReaderDestinationName());
187    
188                    _messageBus.removeDestination(
189                            searchEngineRegistration.getSearchWriterDestinationName());
190    
191                    SearchEngineUtil.removeSearchEngine(
192                            searchEngineRegistration.getSearchEngineId());
193    
194                    if (!searchEngineRegistration.isOverride()) {
195                            return;
196                    }
197    
198                    SearchEngineProxyWrapper originalSearchEngineProxy =
199                            searchEngineRegistration.getOriginalSearchEngineProxyWrapper();
200    
201                    Destination searchReaderDestination = getSearchReaderDestination(
202                            _messageBus, searchEngineRegistration.getSearchEngineId());
203    
204                    registerInvokerMessageListener(
205                            searchReaderDestination,
206                            searchEngineRegistration.getOriginalSearchReaderMessageListeners());
207    
208                    Destination searchWriterDestination = getSearchWriterDestination(
209                            _messageBus, searchEngineRegistration.getSearchEngineId());
210    
211                    registerInvokerMessageListener(
212                            searchWriterDestination,
213                            searchEngineRegistration.getOriginalSearchWriterMessageListeners());
214    
215                    SearchEngineUtil.setSearchEngine(
216                            searchEngineRegistration.getSearchEngineId(),
217                            originalSearchEngineProxy);
218            }
219    
220            protected abstract String getDefaultSearchEngineId();
221    
222            protected abstract IndexSearcher getIndexSearcher();
223    
224            protected abstract IndexWriter getIndexWriter();
225    
226            protected abstract ClassLoader getOperatingClassloader();
227    
228            protected Destination getSearchReaderDestination(
229                    MessageBus messageBus, String searchEngineId) {
230    
231                    String searchReaderDestinationName =
232                            SearchEngineUtil.getSearchReaderDestinationName(searchEngineId);
233    
234                    Destination searchReaderDestination = messageBus.getDestination(
235                            searchReaderDestinationName);
236    
237                    if (searchReaderDestination == null) {
238                            searchReaderDestination = createSearchReaderDestination(
239                                    searchReaderDestinationName);
240    
241                            messageBus.addDestination(searchReaderDestination);
242                    }
243    
244                    return searchReaderDestination;
245            }
246    
247            protected Destination getSearchWriterDestination(
248                    MessageBus messageBus, String searchEngineId) {
249    
250                    String searchWriterDestinationName =
251                            SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
252    
253                    Destination searchWriterDestination = messageBus.getDestination(
254                            searchWriterDestinationName);
255    
256                    if (searchWriterDestination == null) {
257                            searchWriterDestination = createSearchWriterDestination(
258                                    searchWriterDestinationName);
259    
260                            messageBus.addDestination(searchWriterDestination);
261                    }
262    
263                    return searchWriterDestination;
264            }
265    
266            protected void initialize() {
267                    Set<Entry<String, SearchEngine>> entrySet = _searchEngines.entrySet();
268    
269                    for (Entry<String, SearchEngine> entry : entrySet) {
270                            initSearchEngine(entry.getKey(), entry.getValue());
271                    }
272    
273                    String defaultSearchEngineId = getDefaultSearchEngineId();
274    
275                    if (Validator.isNotNull(defaultSearchEngineId)) {
276                            _originalSearchEngineId =
277                                    SearchEngineUtil.getDefaultSearchEngineId();
278    
279                            SearchEngineUtil.setDefaultSearchEngineId(defaultSearchEngineId);
280                    }
281    
282                    _searchEngines.clear();
283            }
284    
285            protected void initSearchEngine(
286                    String searchEngineId, SearchEngine searchEngine) {
287    
288                    SearchEngineRegistration searchEngineRegistration =
289                            new SearchEngineRegistration(searchEngineId);
290    
291                    _searchEngineRegistrations.add(searchEngineRegistration);
292    
293                    Destination searchReaderDestination = getSearchReaderDestination(
294                            _messageBus, searchEngineId);
295    
296                    searchEngineRegistration.setSearchReaderDestinationName(
297                            searchReaderDestination.getName());
298    
299                    Destination searchWriterDestination = getSearchWriterDestination(
300                            _messageBus, searchEngineId);
301    
302                    searchEngineRegistration.setSearchWriterDestinationName(
303                            searchWriterDestination.getName());
304    
305                    SearchEngine originalSearchEngine =
306                            SearchEngineUtil.getSearchEngineSilent(searchEngineId);
307    
308                    if (originalSearchEngine != null) {
309                            searchEngineRegistration.setOverride(true);
310    
311                            searchEngineRegistration.setOriginalSearchEngineProxyWrapper(
312                                    (SearchEngineProxyWrapper)originalSearchEngine);
313    
314                            savePreviousSearchEngineListeners(
315                                    searchReaderDestination, searchWriterDestination,
316                                    searchEngineRegistration);
317    
318                            _messageBus.removeDestination(
319                                    searchReaderDestination.getName(), false);
320    
321                            searchReaderDestination = getSearchReaderDestination(
322                                    _messageBus, searchEngineId);
323    
324                            _messageBus.removeDestination(
325                                    searchWriterDestination.getName(), false);
326    
327                            searchWriterDestination = getSearchWriterDestination(
328                                    _messageBus, searchEngineId);
329                    }
330    
331                    createSearchEngineListeners(
332                            searchEngineId, searchEngine, searchReaderDestination,
333                            searchWriterDestination);
334    
335                    SearchEngineProxyWrapper searchEngineProxyWrapper =
336                            new SearchEngineProxyWrapper(
337                                    searchEngine, getIndexSearcher(), getIndexWriter());
338    
339                    SearchEngineUtil.setSearchEngine(
340                            searchEngineId, searchEngineProxyWrapper);
341            }
342    
343            protected void registerInvokerMessageListener(
344                    Destination destination,
345                    List<InvokerMessageListener> invokerMessageListeners) {
346    
347                    for (InvokerMessageListener invokerMessageListener :
348                                    invokerMessageListeners) {
349    
350                            destination.register(
351                                    invokerMessageListener.getMessageListener(),
352                                    invokerMessageListener.getClassLoader());
353                    }
354            }
355    
356            protected void registerSearchEngineMessageListener(
357                    String searchEngineId, SearchEngine searchEngine,
358                    Destination destination,
359                    BaseSearchEngineMessageListener baseSearchEngineMessageListener,
360                    Object manager) {
361    
362                    baseSearchEngineMessageListener.setManager(manager);
363                    baseSearchEngineMessageListener.setMessageBus(_messageBus);
364                    baseSearchEngineMessageListener.setSearchEngine(searchEngine);
365                    baseSearchEngineMessageListener.setSearchEngineId(searchEngineId);
366    
367                    destination.register(
368                            baseSearchEngineMessageListener, getOperatingClassloader());
369            }
370    
371            protected void savePreviousSearchEngineListeners(
372                    Destination searchReaderDestination,
373                    Destination searchWriterDestination,
374                    SearchEngineRegistration searchEngineRegistration) {
375    
376                    Set<MessageListener> searchReaderMessageListeners =
377                            searchReaderDestination.getMessageListeners();
378    
379                    for (MessageListener searchReaderMessageListener :
380                                    searchReaderMessageListeners) {
381    
382                            InvokerMessageListener invokerMessageListener =
383                                    (InvokerMessageListener)searchReaderMessageListener;
384    
385                            searchEngineRegistration.addOriginalSearchReaderMessageListener(
386                                    invokerMessageListener);
387                    }
388    
389                    Set<MessageListener> searchWriterMessageListeners =
390                            searchWriterDestination.getMessageListeners();
391    
392                    for (MessageListener searchWriterMessageListener :
393                                    searchWriterMessageListeners) {
394    
395                            InvokerMessageListener invokerMessageListener =
396                                    (InvokerMessageListener)searchWriterMessageListener;
397    
398                            searchEngineRegistration.addOriginalSearchWriterMessageListener(
399                                    invokerMessageListener);
400                    }
401            }
402    
403            private static final int _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE =
404                    GetterUtil.getInteger(
405                            PropsUtil.get(PropsKeys.INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE));
406    
407            private static final Log _log = LogFactoryUtil.getLog(
408                    AbstractSearchEngineConfigurator.class);
409    
410            private volatile MessageBus _messageBus;
411            private String _originalSearchEngineId;
412            private final List<SearchEngineRegistration> _searchEngineRegistrations =
413                    new ArrayList<>();
414            private Map<String, SearchEngine> _searchEngines;
415    
416            private class SearchEngineRegistration {
417    
418                    public void addOriginalSearchReaderMessageListener(
419                            InvokerMessageListener messageListener) {
420    
421                            _originalSearchReaderMessageListeners.add(messageListener);
422                    }
423    
424                    public void addOriginalSearchWriterMessageListener(
425                            InvokerMessageListener messageListener) {
426    
427                            _originalSearchWriterMessageListeners.add(messageListener);
428                    }
429    
430                    public SearchEngineProxyWrapper getOriginalSearchEngineProxyWrapper() {
431                            return _originalSearchEngineProxyWrapper;
432                    }
433    
434                    public List<InvokerMessageListener>
435                            getOriginalSearchReaderMessageListeners() {
436    
437                            return _originalSearchReaderMessageListeners;
438                    }
439    
440                    public List<InvokerMessageListener>
441                            getOriginalSearchWriterMessageListeners() {
442    
443                            return _originalSearchWriterMessageListeners;
444                    }
445    
446                    public String getSearchEngineId() {
447                            return _searchEngineId;
448                    }
449    
450                    public String getSearchReaderDestinationName() {
451                            return _searchReaderDestinationName;
452                    }
453    
454                    public String getSearchWriterDestinationName() {
455                            return _searchWriterDestinationName;
456                    }
457    
458                    public boolean isOverride() {
459                            return _override;
460                    }
461    
462                    public void setOriginalSearchEngineProxyWrapper(
463                            SearchEngineProxyWrapper searchEngineProxyWrapper) {
464    
465                            _originalSearchEngineProxyWrapper = searchEngineProxyWrapper;
466                    }
467    
468                    public void setOverride(boolean override) {
469                            _override = override;
470                    }
471    
472                    public void setSearchReaderDestinationName(
473                            String searchReaderDestinationName) {
474    
475                            _searchReaderDestinationName = searchReaderDestinationName;
476                    }
477    
478                    public void setSearchWriterDestinationName(
479                            String searchWriterDestinationName) {
480    
481                            _searchWriterDestinationName = searchWriterDestinationName;
482                    }
483    
484                    private SearchEngineRegistration(String searchEngineId) {
485                            _searchEngineId = searchEngineId;
486                    }
487    
488                    private SearchEngineProxyWrapper _originalSearchEngineProxyWrapper;
489                    private final List<InvokerMessageListener>
490                            _originalSearchReaderMessageListeners = new ArrayList<>();
491                    private final List<InvokerMessageListener>
492                            _originalSearchWriterMessageListeners = new ArrayList<>();
493                    private boolean _override;
494                    private final String _searchEngineId;
495                    private String _searchReaderDestinationName;
496                    private String _searchWriterDestinationName;
497    
498            }
499    
500    }