001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.search;
016    
017    import com.liferay.portal.kernel.cluster.messaging.ClusterBridgeMessageListener;
018    import com.liferay.portal.kernel.concurrent.CallerRunsPolicy;
019    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Destination;
024    import com.liferay.portal.kernel.messaging.InvokerMessageListener;
025    import com.liferay.portal.kernel.messaging.MessageBus;
026    import com.liferay.portal.kernel.messaging.MessageListener;
027    import com.liferay.portal.kernel.messaging.ParallelDestination;
028    import com.liferay.portal.kernel.messaging.SynchronousDestination;
029    import com.liferay.portal.kernel.search.messaging.BaseSearchEngineMessageListener;
030    import com.liferay.portal.kernel.search.messaging.SearchReaderMessageListener;
031    import com.liferay.portal.kernel.search.messaging.SearchWriterMessageListener;
032    import com.liferay.portal.kernel.util.GetterUtil;
033    import com.liferay.portal.kernel.util.PropsKeys;
034    import com.liferay.portal.kernel.util.PropsUtil;
035    import com.liferay.portal.kernel.util.StringBundler;
036    import com.liferay.portal.kernel.util.Validator;
037    
038    import java.util.ArrayList;
039    import java.util.List;
040    import java.util.Map;
041    import java.util.Map.Entry;
042    import java.util.Set;
043    
044    /**
045     * @author Michael C. Han
046     */
047    public abstract class AbstractSearchEngineConfigurator
048            implements SearchEngineConfigurator {
049    
050            @Override
051            public void afterPropertiesSet() {
052                    Set<Entry<String, SearchEngine>> entrySet = _searchEngines.entrySet();
053    
054                    for (Entry<String, SearchEngine> entry : entrySet) {
055                            initSearchEngine(entry.getKey(), entry.getValue());
056                    }
057    
058                    String defaultSearchEngineId = getDefaultSearchEngineId();
059    
060                    if (Validator.isNotNull(defaultSearchEngineId)) {
061                            _originalSearchEngineId =
062                                    SearchEngineUtil.getDefaultSearchEngineId();
063    
064                            SearchEngineUtil.setDefaultSearchEngineId(defaultSearchEngineId);
065                    }
066    
067                    _searchEngines.clear();
068            }
069    
070            @Override
071            public void destroy() {
072                    for (SearchEngineRegistration searchEngineRegistration :
073                                    _searchEngineRegistrations) {
074    
075                            destroySearchEngine(searchEngineRegistration);
076                    }
077    
078                    _searchEngineRegistrations.clear();
079    
080                    if (Validator.isNotNull(_originalSearchEngineId)) {
081                            SearchEngineUtil.setDefaultSearchEngineId(_originalSearchEngineId);
082    
083                            _originalSearchEngineId = null;
084                    }
085            }
086    
087            @Override
088            public void setSearchEngines(Map<String, SearchEngine> searchEngines) {
089                    _searchEngines = searchEngines;
090            }
091    
092            protected void createSearchEngineListeners(
093                    String searchEngineId, SearchEngine searchEngine,
094                    Destination searchReaderDestination,
095                    Destination searchWriterDestination) {
096    
097                    registerSearchEngineMessageListener(
098                            searchEngineId, searchEngine, searchReaderDestination,
099                            new SearchReaderMessageListener(), searchEngine.getIndexSearcher());
100    
101                    registerSearchEngineMessageListener(
102                            searchEngineId, searchEngine, searchWriterDestination,
103                            new SearchWriterMessageListener(), searchEngine.getIndexWriter());
104    
105                    if (searchEngine.isClusteredWrite()) {
106                            ClusterBridgeMessageListener clusterBridgeMessageListener =
107                                    new ClusterBridgeMessageListener();
108    
109                            clusterBridgeMessageListener.setPriority(
110                                    searchEngine.getClusteredWritePriority());
111    
112                            searchWriterDestination.register(clusterBridgeMessageListener);
113                    }
114            }
115    
116            protected Destination createSearchReaderDestination(
117                    String searchReaderDestinationName) {
118    
119                    SynchronousDestination synchronousDestination =
120                            new SynchronousDestination();
121    
122                    synchronousDestination.setName(searchReaderDestinationName);
123    
124                    return synchronousDestination;
125            }
126    
127            protected Destination createSearchWriterDestination(
128                    String searchWriterDestinationName) {
129    
130                    ParallelDestination parallelDestination = new ParallelDestination();
131    
132                    parallelDestination.setName(searchWriterDestinationName);
133    
134                    if (_INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE > 0) {
135                            parallelDestination.setMaximumQueueSize(
136                                    _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE);
137    
138                            RejectedExecutionHandler rejectedExecutionHandler =
139                                    new CallerRunsPolicy() {
140    
141                                            @Override
142                                            public void rejectedExecution(
143                                                    Runnable runnable,
144                                                    ThreadPoolExecutor threadPoolExecutor) {
145    
146                                                    if (_log.isWarnEnabled()) {
147                                                            StringBundler sb = new StringBundler(4);
148    
149                                                            sb.append("The search index writer's task queue ");
150                                                            sb.append("is at its maximum capacity. The ");
151                                                            sb.append("current thread will handle the ");
152                                                            sb.append("request.");
153    
154                                                            _log.warn(sb.toString());
155                                                    }
156    
157                                                    super.rejectedExecution(runnable, threadPoolExecutor);
158                                            }
159    
160                                    };
161    
162                            parallelDestination.setRejectedExecutionHandler(
163                                    rejectedExecutionHandler);
164                    }
165    
166                    return parallelDestination;
167            }
168    
169            protected void destroySearchEngine(
170                    SearchEngineRegistration searchEngineRegistration) {
171    
172                    MessageBus messageBus = getMessageBus();
173    
174                    Destination searchReaderDestination = messageBus.removeDestination(
175                            searchEngineRegistration.getSearchReaderDestinationName());
176    
177                    searchReaderDestination.close(true);
178    
179                    Destination searchWriterDestination = messageBus.removeDestination(
180                            searchEngineRegistration.getSearchWriterDestinationName());
181    
182                    searchWriterDestination.close(true);
183    
184                    SearchEngineUtil.removeSearchEngine(
185                            searchEngineRegistration.getSearchEngineId());
186    
187                    if (!searchEngineRegistration.isOverride()) {
188                            return;
189                    }
190    
191                    SearchEngineProxyWrapper originalSearchEngineProxy =
192                            searchEngineRegistration.getOriginalSearchEngineProxyWrapper();
193    
194                    SearchEngine originalSearchEngine =
195                            originalSearchEngineProxy.getSearchEngine();
196    
197                    searchReaderDestination = getSearchReaderDestination(
198                            messageBus, searchEngineRegistration.getSearchEngineId(),
199                            originalSearchEngine);
200    
201                    registerInvokerMessageListener(
202                            searchReaderDestination,
203                            searchEngineRegistration.getOriginalSearchReaderMessageListeners());
204    
205                    searchWriterDestination = getSearchWriterDestination(
206                            messageBus, searchEngineRegistration.getSearchEngineId(),
207                            originalSearchEngine);
208    
209                    registerInvokerMessageListener(
210                            searchWriterDestination,
211                            searchEngineRegistration.getOriginalSearchWriterMessageListeners());
212    
213                    SearchEngineUtil.setSearchEngine(
214                            searchEngineRegistration.getSearchEngineId(),
215                            originalSearchEngineProxy);
216            }
217    
218            protected abstract String getDefaultSearchEngineId();
219    
220            protected abstract IndexSearcher getIndexSearcher();
221    
222            protected abstract IndexWriter getIndexWriter();
223    
224            protected abstract MessageBus getMessageBus();
225    
226            protected abstract ClassLoader getOperatingClassloader();
227    
228            protected Destination getSearchReaderDestination(
229                    MessageBus messageBus, String searchEngineId,
230                    SearchEngine searchEngine) {
231    
232                    String searchReaderDestinationName =
233                            SearchEngineUtil.getSearchReaderDestinationName(searchEngineId);
234    
235                    Destination searchReaderDestination = messageBus.getDestination(
236                            searchReaderDestinationName);
237    
238                    if (searchReaderDestination == null) {
239                            searchReaderDestination = createSearchReaderDestination(
240                                    searchReaderDestinationName);
241    
242                            searchReaderDestination.open();
243    
244                            messageBus.addDestination(searchReaderDestination);
245                    }
246    
247                    return searchReaderDestination;
248            }
249    
250            protected Destination getSearchWriterDestination(
251                    MessageBus messageBus, String searchEngineId,
252                    SearchEngine searchEngine) {
253    
254                    String searchWriterDestinationName =
255                            SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
256    
257                    Destination searchWriterDestination = messageBus.getDestination(
258                            searchWriterDestinationName);
259    
260                    if (searchWriterDestination == null) {
261                            searchWriterDestination = createSearchWriterDestination(
262                                    searchWriterDestinationName);
263    
264                            searchWriterDestination.open();
265    
266                            messageBus.addDestination(searchWriterDestination);
267                    }
268    
269                    return searchWriterDestination;
270            }
271    
272            protected void initSearchEngine(
273                    String searchEngineId, SearchEngine searchEngine) {
274    
275                    SearchEngineRegistration searchEngineRegistration =
276                            new SearchEngineRegistration(searchEngineId);
277    
278                    _searchEngineRegistrations.add(searchEngineRegistration);
279    
280                    MessageBus messageBus = getMessageBus();
281    
282                    Destination searchReaderDestination = getSearchReaderDestination(
283                            messageBus, searchEngineId, searchEngine);
284    
285                    searchEngineRegistration.setSearchReaderDestinationName(
286                            searchReaderDestination.getName());
287    
288                    Destination searchWriterDestination = getSearchWriterDestination(
289                            messageBus, searchEngineId, searchEngine);
290    
291                    searchEngineRegistration.setSearchWriterDestinationName(
292                            searchWriterDestination.getName());
293    
294                    SearchEngine originalSearchEngine =
295                            SearchEngineUtil.getSearchEngineSilent(searchEngineId);
296    
297                    if (originalSearchEngine != null) {
298                            searchEngineRegistration.setOverride(true);
299    
300                            searchEngineRegistration.setOriginalSearchEngineProxyWrapper(
301                                    (SearchEngineProxyWrapper)originalSearchEngine);
302    
303                            savePreviousSearchEngineListeners(
304                                    searchReaderDestination, searchWriterDestination,
305                                    searchEngineRegistration);
306    
307                            messageBus.removeDestination(searchReaderDestination.getName());
308    
309                            searchReaderDestination = getSearchReaderDestination(
310                                    messageBus, searchEngineId, originalSearchEngine);
311    
312                            messageBus.removeDestination(searchWriterDestination.getName());
313    
314                            searchWriterDestination = getSearchWriterDestination(
315                                    messageBus, searchEngineId, originalSearchEngine);
316                    }
317    
318                    createSearchEngineListeners(
319                            searchEngineId, searchEngine, searchReaderDestination,
320                            searchWriterDestination);
321    
322                    SearchEngineProxyWrapper searchEngineProxyWrapper =
323                            new SearchEngineProxyWrapper(
324                                    searchEngine, getIndexSearcher(), getIndexWriter());
325    
326                    SearchEngineUtil.setSearchEngine(
327                            searchEngineId, searchEngineProxyWrapper);
328            }
329    
330            protected void registerInvokerMessageListener(
331                    Destination destination,
332                    List<InvokerMessageListener> invokerMessageListeners) {
333    
334                    for (InvokerMessageListener invokerMessageListener :
335                                    invokerMessageListeners) {
336    
337                            destination.register(
338                                    invokerMessageListener.getMessageListener(),
339                                    invokerMessageListener.getClassLoader());
340                    }
341            }
342    
343            protected void registerSearchEngineMessageListener(
344                    String searchEngineId, SearchEngine searchEngine,
345                    Destination destination,
346                    BaseSearchEngineMessageListener baseSearchEngineMessageListener,
347                    Object manager) {
348    
349                    baseSearchEngineMessageListener.setManager(manager);
350                    baseSearchEngineMessageListener.setMessageBus(getMessageBus());
351                    baseSearchEngineMessageListener.setSearchEngine(searchEngine);
352                    baseSearchEngineMessageListener.setSearchEngineId(searchEngineId);
353    
354                    destination.register(
355                            baseSearchEngineMessageListener, getOperatingClassloader());
356            }
357    
358            protected void savePreviousSearchEngineListeners(
359                    Destination searchReaderDestination,
360                    Destination searchWriterDestination,
361                    SearchEngineRegistration searchEngineRegistration) {
362    
363                    Set<MessageListener> searchReaderMessageListeners =
364                            searchReaderDestination.getMessageListeners();
365    
366                    for (MessageListener searchReaderMessageListener :
367                                    searchReaderMessageListeners) {
368    
369                            InvokerMessageListener invokerMessageListener =
370                                    (InvokerMessageListener)searchReaderMessageListener;
371    
372                            searchEngineRegistration.addOriginalSearchReaderMessageListener(
373                                    invokerMessageListener);
374                    }
375    
376                    Set<MessageListener> searchWriterMessageListeners =
377                            searchWriterDestination.getMessageListeners();
378    
379                    for (MessageListener searchWriterMessageListener :
380                                    searchWriterMessageListeners) {
381    
382                            InvokerMessageListener invokerMessageListener =
383                                    (InvokerMessageListener)searchWriterMessageListener;
384    
385                            searchEngineRegistration.addOriginalSearchWriterMessageListener(
386                                    invokerMessageListener);
387                    }
388            }
389    
390            private static final int _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE =
391                    GetterUtil.getInteger(
392                            PropsUtil.get(PropsKeys.INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE));
393    
394            private static final Log _log = LogFactoryUtil.getLog(
395                    AbstractSearchEngineConfigurator.class);
396    
397            private String _originalSearchEngineId;
398            private final List<SearchEngineRegistration> _searchEngineRegistrations =
399                    new ArrayList<SearchEngineRegistration>();
400            private Map<String, SearchEngine> _searchEngines;
401    
402            private class SearchEngineRegistration {
403    
404                    public void addOriginalSearchReaderMessageListener(
405                            InvokerMessageListener messageListener) {
406    
407                            _originalSearchReaderMessageListeners.add(messageListener);
408                    }
409    
410                    public void addOriginalSearchWriterMessageListener(
411                            InvokerMessageListener messageListener) {
412    
413                            _originalSearchWriterMessageListeners.add(messageListener);
414                    }
415    
416                    public SearchEngineProxyWrapper getOriginalSearchEngineProxyWrapper() {
417                            return _originalSearchEngineProxyWrapper;
418                    }
419    
420                    public List<InvokerMessageListener>
421                            getOriginalSearchReaderMessageListeners() {
422    
423                            return _originalSearchReaderMessageListeners;
424                    }
425    
426                    public List<InvokerMessageListener>
427                            getOriginalSearchWriterMessageListeners() {
428    
429                            return _originalSearchWriterMessageListeners;
430                    }
431    
432                    public String getSearchEngineId() {
433                            return _searchEngineId;
434                    }
435    
436                    public String getSearchReaderDestinationName() {
437                            return _searchReaderDestinationName;
438                    }
439    
440                    public String getSearchWriterDestinationName() {
441                            return _searchWriterDestinationName;
442                    }
443    
444                    public boolean isOverride() {
445                            return _override;
446                    }
447    
448                    public void setOriginalSearchEngineProxyWrapper(
449                            SearchEngineProxyWrapper searchEngineProxyWrapper) {
450    
451                            _originalSearchEngineProxyWrapper = searchEngineProxyWrapper;
452                    }
453    
454                    public void setOverride(boolean override) {
455                            _override = override;
456                    }
457    
458                    public void setSearchReaderDestinationName(
459                            String searchReaderDestinationName) {
460    
461                            _searchReaderDestinationName = searchReaderDestinationName;
462                    }
463    
464                    public void setSearchWriterDestinationName(
465                            String searchWriterDestinationName) {
466    
467                            _searchWriterDestinationName = searchWriterDestinationName;
468                    }
469    
470                    private SearchEngineRegistration(String searchEngineId) {
471                            _searchEngineId = searchEngineId;
472                    }
473    
474                    private SearchEngineProxyWrapper _originalSearchEngineProxyWrapper;
475                    private final List<InvokerMessageListener>
476                            _originalSearchReaderMessageListeners =
477                                    new ArrayList<InvokerMessageListener>();
478                    private final List<InvokerMessageListener>
479                            _originalSearchWriterMessageListeners =
480                                    new ArrayList<InvokerMessageListener>();
481                    private boolean _override;
482                    private final String _searchEngineId;
483                    private String _searchReaderDestinationName;
484                    private String _searchWriterDestinationName;
485    
486            }
487    
488    }