001
014
015 package com.liferay.portal.kernel.search;
016
017 import com.liferay.portal.kernel.concurrent.CallerRunsPolicy;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.Destination;
023 import com.liferay.portal.kernel.messaging.DestinationConfiguration;
024 import com.liferay.portal.kernel.messaging.DestinationFactory;
025 import com.liferay.portal.kernel.messaging.DestinationFactoryUtil;
026 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageListener;
029 import com.liferay.portal.kernel.search.messaging.BaseSearchEngineMessageListener;
030 import com.liferay.portal.kernel.search.messaging.SearchReaderMessageListener;
031 import com.liferay.portal.kernel.search.messaging.SearchWriterMessageListener;
032 import com.liferay.portal.kernel.util.GetterUtil;
033 import com.liferay.portal.kernel.util.PortalRunMode;
034 import com.liferay.portal.kernel.util.PropsKeys;
035 import com.liferay.portal.kernel.util.PropsUtil;
036 import com.liferay.portal.kernel.util.StringBundler;
037 import com.liferay.portal.kernel.util.Validator;
038 import com.liferay.registry.Registry;
039 import com.liferay.registry.RegistryUtil;
040 import com.liferay.registry.dependency.ServiceDependencyListener;
041 import com.liferay.registry.dependency.ServiceDependencyManager;
042
043 import java.util.ArrayList;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Map.Entry;
047 import java.util.Set;
048
049
052 public abstract class AbstractSearchEngineConfigurator
053 implements SearchEngineConfigurator {
054
055 @Override
056 public void afterPropertiesSet() {
057 final ServiceDependencyManager serviceDependencyManager =
058 new ServiceDependencyManager();
059
060 serviceDependencyManager.addServiceDependencyListener(
061 new ServiceDependencyListener() {
062
063 @Override
064 public void dependenciesFulfilled() {
065 Registry registry = RegistryUtil.getRegistry();
066
067 _messageBus = registry.getService(MessageBus.class);
068
069 initialize();
070 }
071
072 @Override
073 public void destroy() {
074 }
075
076 });
077
078 serviceDependencyManager.registerDependencies(
079 DestinationFactory.class, MessageBus.class);
080 }
081
082 @Override
083 public void destroy() {
084 for (SearchEngineRegistration searchEngineRegistration :
085 _searchEngineRegistrations) {
086
087 destroySearchEngine(searchEngineRegistration);
088 }
089
090 _searchEngineRegistrations.clear();
091
092 if (Validator.isNotNull(_originalSearchEngineId)) {
093 SearchEngineUtil.setDefaultSearchEngineId(_originalSearchEngineId);
094
095 _originalSearchEngineId = null;
096 }
097 }
098
099 @Override
100 public void setSearchEngines(Map<String, SearchEngine> searchEngines) {
101 _searchEngines = searchEngines;
102 }
103
104 protected void createSearchEngineListeners(
105 String searchEngineId, SearchEngine searchEngine,
106 Destination searchReaderDestination,
107 Destination searchWriterDestination) {
108
109 registerSearchEngineMessageListener(
110 searchEngineId, searchEngine, searchReaderDestination,
111 new SearchReaderMessageListener(), searchEngine.getIndexSearcher());
112
113 registerSearchEngineMessageListener(
114 searchEngineId, searchEngine, searchWriterDestination,
115 new SearchWriterMessageListener(), searchEngine.getIndexWriter());
116 }
117
118 protected Destination createSearchReaderDestination(
119 String searchReaderDestinationName) {
120
121 DestinationConfiguration destinationConfiguration =
122 DestinationConfiguration.createSynchronousDestinationConfiguration(
123 searchReaderDestinationName);
124
125 return DestinationFactoryUtil.createDestination(
126 destinationConfiguration);
127 }
128
129 protected Destination createSearchWriterDestination(
130 String searchWriterDestinationName) {
131
132 DestinationConfiguration destinationConfiguration = null;
133
134 if (PortalRunMode.isTestMode()) {
135 destinationConfiguration =
136 DestinationConfiguration.
137 createSynchronousDestinationConfiguration(
138 searchWriterDestinationName);
139 }
140 else {
141 destinationConfiguration =
142 DestinationConfiguration.createParallelDestinationConfiguration(
143 searchWriterDestinationName);
144 }
145
146 if (_INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE > 0) {
147 destinationConfiguration.setMaximumQueueSize(
148 _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE);
149
150 RejectedExecutionHandler rejectedExecutionHandler =
151 new CallerRunsPolicy() {
152
153 @Override
154 public void rejectedExecution(
155 Runnable runnable,
156 ThreadPoolExecutor threadPoolExecutor) {
157
158 if (_log.isWarnEnabled()) {
159 StringBundler sb = new StringBundler(4);
160
161 sb.append("The search index writer's task queue ");
162 sb.append("is at its maximum capacity. The ");
163 sb.append("current thread will handle the ");
164 sb.append("request.");
165
166 _log.warn(sb.toString());
167 }
168
169 super.rejectedExecution(runnable, threadPoolExecutor);
170 }
171
172 };
173
174 destinationConfiguration.setRejectedExecutionHandler(
175 rejectedExecutionHandler);
176 }
177
178 return DestinationFactoryUtil.createDestination(
179 destinationConfiguration);
180 }
181
182 protected void destroySearchEngine(
183 SearchEngineRegistration searchEngineRegistration) {
184
185 _messageBus.removeDestination(
186 searchEngineRegistration.getSearchReaderDestinationName());
187
188 _messageBus.removeDestination(
189 searchEngineRegistration.getSearchWriterDestinationName());
190
191 SearchEngineUtil.removeSearchEngine(
192 searchEngineRegistration.getSearchEngineId());
193
194 if (!searchEngineRegistration.isOverride()) {
195 return;
196 }
197
198 SearchEngineProxyWrapper originalSearchEngineProxy =
199 searchEngineRegistration.getOriginalSearchEngineProxyWrapper();
200
201 Destination searchReaderDestination = getSearchReaderDestination(
202 _messageBus, searchEngineRegistration.getSearchEngineId());
203
204 registerInvokerMessageListener(
205 searchReaderDestination,
206 searchEngineRegistration.getOriginalSearchReaderMessageListeners());
207
208 Destination searchWriterDestination = getSearchWriterDestination(
209 _messageBus, searchEngineRegistration.getSearchEngineId());
210
211 registerInvokerMessageListener(
212 searchWriterDestination,
213 searchEngineRegistration.getOriginalSearchWriterMessageListeners());
214
215 SearchEngineUtil.setSearchEngine(
216 searchEngineRegistration.getSearchEngineId(),
217 originalSearchEngineProxy);
218 }
219
220 protected abstract String getDefaultSearchEngineId();
221
222 protected abstract IndexSearcher getIndexSearcher();
223
224 protected abstract IndexWriter getIndexWriter();
225
226 protected abstract ClassLoader getOperatingClassloader();
227
228 protected Destination getSearchReaderDestination(
229 MessageBus messageBus, String searchEngineId) {
230
231 String searchReaderDestinationName =
232 SearchEngineUtil.getSearchReaderDestinationName(searchEngineId);
233
234 Destination searchReaderDestination = messageBus.getDestination(
235 searchReaderDestinationName);
236
237 if (searchReaderDestination == null) {
238 searchReaderDestination = createSearchReaderDestination(
239 searchReaderDestinationName);
240
241 messageBus.addDestination(searchReaderDestination);
242 }
243
244 return searchReaderDestination;
245 }
246
247 protected Destination getSearchWriterDestination(
248 MessageBus messageBus, String searchEngineId) {
249
250 String searchWriterDestinationName =
251 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
252
253 Destination searchWriterDestination = messageBus.getDestination(
254 searchWriterDestinationName);
255
256 if (searchWriterDestination == null) {
257 searchWriterDestination = createSearchWriterDestination(
258 searchWriterDestinationName);
259
260 messageBus.addDestination(searchWriterDestination);
261 }
262
263 return searchWriterDestination;
264 }
265
266 protected void initialize() {
267 Set<Entry<String, SearchEngine>> entrySet = _searchEngines.entrySet();
268
269 for (Entry<String, SearchEngine> entry : entrySet) {
270 initSearchEngine(entry.getKey(), entry.getValue());
271 }
272
273 String defaultSearchEngineId = getDefaultSearchEngineId();
274
275 if (Validator.isNotNull(defaultSearchEngineId)) {
276 _originalSearchEngineId =
277 SearchEngineUtil.getDefaultSearchEngineId();
278
279 SearchEngineUtil.setDefaultSearchEngineId(defaultSearchEngineId);
280 }
281
282 _searchEngines.clear();
283 }
284
285 protected void initSearchEngine(
286 String searchEngineId, SearchEngine searchEngine) {
287
288 SearchEngineRegistration searchEngineRegistration =
289 new SearchEngineRegistration(searchEngineId);
290
291 _searchEngineRegistrations.add(searchEngineRegistration);
292
293 Destination searchReaderDestination = getSearchReaderDestination(
294 _messageBus, searchEngineId);
295
296 searchEngineRegistration.setSearchReaderDestinationName(
297 searchReaderDestination.getName());
298
299 Destination searchWriterDestination = getSearchWriterDestination(
300 _messageBus, searchEngineId);
301
302 searchEngineRegistration.setSearchWriterDestinationName(
303 searchWriterDestination.getName());
304
305 SearchEngine originalSearchEngine =
306 SearchEngineUtil.getSearchEngineSilent(searchEngineId);
307
308 if (originalSearchEngine != null) {
309 searchEngineRegistration.setOverride(true);
310
311 searchEngineRegistration.setOriginalSearchEngineProxyWrapper(
312 (SearchEngineProxyWrapper)originalSearchEngine);
313
314 savePreviousSearchEngineListeners(
315 searchReaderDestination, searchWriterDestination,
316 searchEngineRegistration);
317
318 _messageBus.removeDestination(
319 searchReaderDestination.getName(), false);
320
321 searchReaderDestination = getSearchReaderDestination(
322 _messageBus, searchEngineId);
323
324 _messageBus.removeDestination(
325 searchWriterDestination.getName(), false);
326
327 searchWriterDestination = getSearchWriterDestination(
328 _messageBus, searchEngineId);
329 }
330
331 createSearchEngineListeners(
332 searchEngineId, searchEngine, searchReaderDestination,
333 searchWriterDestination);
334
335 SearchEngineProxyWrapper searchEngineProxyWrapper =
336 new SearchEngineProxyWrapper(
337 searchEngine, getIndexSearcher(), getIndexWriter());
338
339 SearchEngineUtil.setSearchEngine(
340 searchEngineId, searchEngineProxyWrapper);
341 }
342
343 protected void registerInvokerMessageListener(
344 Destination destination,
345 List<InvokerMessageListener> invokerMessageListeners) {
346
347 for (InvokerMessageListener invokerMessageListener :
348 invokerMessageListeners) {
349
350 destination.register(
351 invokerMessageListener.getMessageListener(),
352 invokerMessageListener.getClassLoader());
353 }
354 }
355
356 protected void registerSearchEngineMessageListener(
357 String searchEngineId, SearchEngine searchEngine,
358 Destination destination,
359 BaseSearchEngineMessageListener baseSearchEngineMessageListener,
360 Object manager) {
361
362 baseSearchEngineMessageListener.setManager(manager);
363 baseSearchEngineMessageListener.setMessageBus(_messageBus);
364 baseSearchEngineMessageListener.setSearchEngine(searchEngine);
365 baseSearchEngineMessageListener.setSearchEngineId(searchEngineId);
366
367 destination.register(
368 baseSearchEngineMessageListener, getOperatingClassloader());
369 }
370
371 protected void savePreviousSearchEngineListeners(
372 Destination searchReaderDestination,
373 Destination searchWriterDestination,
374 SearchEngineRegistration searchEngineRegistration) {
375
376 Set<MessageListener> searchReaderMessageListeners =
377 searchReaderDestination.getMessageListeners();
378
379 for (MessageListener searchReaderMessageListener :
380 searchReaderMessageListeners) {
381
382 InvokerMessageListener invokerMessageListener =
383 (InvokerMessageListener)searchReaderMessageListener;
384
385 searchEngineRegistration.addOriginalSearchReaderMessageListener(
386 invokerMessageListener);
387 }
388
389 Set<MessageListener> searchWriterMessageListeners =
390 searchWriterDestination.getMessageListeners();
391
392 for (MessageListener searchWriterMessageListener :
393 searchWriterMessageListeners) {
394
395 InvokerMessageListener invokerMessageListener =
396 (InvokerMessageListener)searchWriterMessageListener;
397
398 searchEngineRegistration.addOriginalSearchWriterMessageListener(
399 invokerMessageListener);
400 }
401 }
402
403 private static final int _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE =
404 GetterUtil.getInteger(
405 PropsUtil.get(PropsKeys.INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE));
406
407 private static final Log _log = LogFactoryUtil.getLog(
408 AbstractSearchEngineConfigurator.class);
409
410 private volatile MessageBus _messageBus;
411 private String _originalSearchEngineId;
412 private final List<SearchEngineRegistration> _searchEngineRegistrations =
413 new ArrayList<>();
414 private Map<String, SearchEngine> _searchEngines;
415
416 private class SearchEngineRegistration {
417
418 public void addOriginalSearchReaderMessageListener(
419 InvokerMessageListener messageListener) {
420
421 _originalSearchReaderMessageListeners.add(messageListener);
422 }
423
424 public void addOriginalSearchWriterMessageListener(
425 InvokerMessageListener messageListener) {
426
427 _originalSearchWriterMessageListeners.add(messageListener);
428 }
429
430 public SearchEngineProxyWrapper getOriginalSearchEngineProxyWrapper() {
431 return _originalSearchEngineProxyWrapper;
432 }
433
434 public List<InvokerMessageListener>
435 getOriginalSearchReaderMessageListeners() {
436
437 return _originalSearchReaderMessageListeners;
438 }
439
440 public List<InvokerMessageListener>
441 getOriginalSearchWriterMessageListeners() {
442
443 return _originalSearchWriterMessageListeners;
444 }
445
446 public String getSearchEngineId() {
447 return _searchEngineId;
448 }
449
450 public String getSearchReaderDestinationName() {
451 return _searchReaderDestinationName;
452 }
453
454 public String getSearchWriterDestinationName() {
455 return _searchWriterDestinationName;
456 }
457
458 public boolean isOverride() {
459 return _override;
460 }
461
462 public void setOriginalSearchEngineProxyWrapper(
463 SearchEngineProxyWrapper searchEngineProxyWrapper) {
464
465 _originalSearchEngineProxyWrapper = searchEngineProxyWrapper;
466 }
467
468 public void setOverride(boolean override) {
469 _override = override;
470 }
471
472 public void setSearchReaderDestinationName(
473 String searchReaderDestinationName) {
474
475 _searchReaderDestinationName = searchReaderDestinationName;
476 }
477
478 public void setSearchWriterDestinationName(
479 String searchWriterDestinationName) {
480
481 _searchWriterDestinationName = searchWriterDestinationName;
482 }
483
484 private SearchEngineRegistration(String searchEngineId) {
485 _searchEngineId = searchEngineId;
486 }
487
488 private SearchEngineProxyWrapper _originalSearchEngineProxyWrapper;
489 private final List<InvokerMessageListener>
490 _originalSearchReaderMessageListeners = new ArrayList<>();
491 private final List<InvokerMessageListener>
492 _originalSearchWriterMessageListeners = new ArrayList<>();
493 private boolean _override;
494 private final String _searchEngineId;
495 private String _searchReaderDestinationName;
496 private String _searchWriterDestinationName;
497
498 }
499
500 }