001
014
015 package com.liferay.portal.kernel.search;
016
017 import com.liferay.portal.kernel.cluster.messaging.ClusterBridgeMessageListener;
018 import com.liferay.portal.kernel.concurrent.CallerRunsPolicy;
019 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Destination;
024 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
025 import com.liferay.portal.kernel.messaging.MessageBus;
026 import com.liferay.portal.kernel.messaging.MessageListener;
027 import com.liferay.portal.kernel.messaging.ParallelDestination;
028 import com.liferay.portal.kernel.messaging.SynchronousDestination;
029 import com.liferay.portal.kernel.search.messaging.BaseSearchEngineMessageListener;
030 import com.liferay.portal.kernel.search.messaging.SearchReaderMessageListener;
031 import com.liferay.portal.kernel.search.messaging.SearchWriterMessageListener;
032 import com.liferay.portal.kernel.util.GetterUtil;
033 import com.liferay.portal.kernel.util.PropsKeys;
034 import com.liferay.portal.kernel.util.PropsUtil;
035 import com.liferay.portal.kernel.util.StringBundler;
036 import com.liferay.portal.kernel.util.Validator;
037
038 import java.util.ArrayList;
039 import java.util.List;
040 import java.util.Map;
041 import java.util.Map.Entry;
042 import java.util.Set;
043
044
047 public abstract class AbstractSearchEngineConfigurator
048 implements SearchEngineConfigurator {
049
050 @Override
051 public void afterPropertiesSet() {
052 Set<Entry<String, SearchEngine>> entrySet = _searchEngines.entrySet();
053
054 for (Entry<String, SearchEngine> entry : entrySet) {
055 initSearchEngine(entry.getKey(), entry.getValue());
056 }
057
058 String defaultSearchEngineId = getDefaultSearchEngineId();
059
060 if (Validator.isNotNull(defaultSearchEngineId)) {
061 _originalSearchEngineId =
062 SearchEngineUtil.getDefaultSearchEngineId();
063
064 SearchEngineUtil.setDefaultSearchEngineId(defaultSearchEngineId);
065 }
066
067 _searchEngines.clear();
068 }
069
070 @Override
071 public void destroy() {
072 for (SearchEngineRegistration searchEngineRegistration :
073 _searchEngineRegistrations) {
074
075 destroySearchEngine(searchEngineRegistration);
076 }
077
078 _searchEngineRegistrations.clear();
079
080 if (Validator.isNotNull(_originalSearchEngineId)) {
081 SearchEngineUtil.setDefaultSearchEngineId(_originalSearchEngineId);
082
083 _originalSearchEngineId = null;
084 }
085 }
086
087 @Override
088 public void setSearchEngines(Map<String, SearchEngine> searchEngines) {
089 _searchEngines = searchEngines;
090 }
091
092 protected void createSearchEngineListeners(
093 String searchEngineId, SearchEngine searchEngine,
094 Destination searchReaderDestination,
095 Destination searchWriterDestination) {
096
097 registerSearchEngineMessageListener(
098 searchEngineId, searchEngine, searchReaderDestination,
099 new SearchReaderMessageListener(), searchEngine.getIndexSearcher());
100
101 registerSearchEngineMessageListener(
102 searchEngineId, searchEngine, searchWriterDestination,
103 new SearchWriterMessageListener(), searchEngine.getIndexWriter());
104
105 if (searchEngine.isClusteredWrite()) {
106 ClusterBridgeMessageListener clusterBridgeMessageListener =
107 new ClusterBridgeMessageListener();
108
109 clusterBridgeMessageListener.setPriority(
110 searchEngine.getClusteredWritePriority());
111
112 searchWriterDestination.register(clusterBridgeMessageListener);
113 }
114 }
115
116 protected Destination createSearchReaderDestination(
117 String searchReaderDestinationName) {
118
119 SynchronousDestination synchronousDestination =
120 new SynchronousDestination();
121
122 synchronousDestination.setName(searchReaderDestinationName);
123
124 return synchronousDestination;
125 }
126
127 protected Destination createSearchWriterDestination(
128 String searchWriterDestinationName) {
129
130 ParallelDestination parallelDestination = new ParallelDestination();
131
132 parallelDestination.setName(searchWriterDestinationName);
133
134 if (_INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE > 0) {
135 parallelDestination.setMaximumQueueSize(
136 _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE);
137
138 RejectedExecutionHandler rejectedExecutionHandler =
139 new CallerRunsPolicy() {
140
141 @Override
142 public void rejectedExecution(
143 Runnable runnable,
144 ThreadPoolExecutor threadPoolExecutor) {
145
146 if (_log.isWarnEnabled()) {
147 StringBundler sb = new StringBundler(4);
148
149 sb.append("The search index writer's task queue ");
150 sb.append("is at its maximum capacity. The ");
151 sb.append("current thread will handle the ");
152 sb.append("request.");
153
154 _log.warn(sb.toString());
155 }
156
157 super.rejectedExecution(runnable, threadPoolExecutor);
158 }
159
160 };
161
162 parallelDestination.setRejectedExecutionHandler(
163 rejectedExecutionHandler);
164 }
165
166 return parallelDestination;
167 }
168
169 protected void destroySearchEngine(
170 SearchEngineRegistration searchEngineRegistration) {
171
172 MessageBus messageBus = getMessageBus();
173
174 Destination searchReaderDestination = messageBus.removeDestination(
175 searchEngineRegistration.getSearchReaderDestinationName());
176
177 searchReaderDestination.close(true);
178
179 Destination searchWriterDestination = messageBus.removeDestination(
180 searchEngineRegistration.getSearchWriterDestinationName());
181
182 searchWriterDestination.close(true);
183
184 SearchEngineUtil.removeSearchEngine(
185 searchEngineRegistration.getSearchEngineId());
186
187 if (!searchEngineRegistration.isOverride()) {
188 return;
189 }
190
191 SearchEngineProxyWrapper originalSearchEngineProxy =
192 searchEngineRegistration.getOriginalSearchEngineProxyWrapper();
193
194 SearchEngine originalSearchEngine =
195 originalSearchEngineProxy.getSearchEngine();
196
197 searchReaderDestination = getSearchReaderDestination(
198 messageBus, searchEngineRegistration.getSearchEngineId(),
199 originalSearchEngine);
200
201 registerInvokerMessageListener(
202 searchReaderDestination,
203 searchEngineRegistration.getOriginalSearchReaderMessageListeners());
204
205 searchWriterDestination = getSearchWriterDestination(
206 messageBus, searchEngineRegistration.getSearchEngineId(),
207 originalSearchEngine);
208
209 registerInvokerMessageListener(
210 searchWriterDestination,
211 searchEngineRegistration.getOriginalSearchWriterMessageListeners());
212
213 SearchEngineUtil.setSearchEngine(
214 searchEngineRegistration.getSearchEngineId(),
215 originalSearchEngineProxy);
216 }
217
218 protected abstract String getDefaultSearchEngineId();
219
220 protected abstract IndexSearcher getIndexSearcher();
221
222 protected abstract IndexWriter getIndexWriter();
223
224 protected abstract MessageBus getMessageBus();
225
226 protected abstract ClassLoader getOperatingClassloader();
227
228 protected Destination getSearchReaderDestination(
229 MessageBus messageBus, String searchEngineId,
230 SearchEngine searchEngine) {
231
232 String searchReaderDestinationName =
233 SearchEngineUtil.getSearchReaderDestinationName(searchEngineId);
234
235 Destination searchReaderDestination = messageBus.getDestination(
236 searchReaderDestinationName);
237
238 if (searchReaderDestination == null) {
239 searchReaderDestination = createSearchReaderDestination(
240 searchReaderDestinationName);
241
242 searchReaderDestination.open();
243
244 messageBus.addDestination(searchReaderDestination);
245 }
246
247 return searchReaderDestination;
248 }
249
250 protected Destination getSearchWriterDestination(
251 MessageBus messageBus, String searchEngineId,
252 SearchEngine searchEngine) {
253
254 String searchWriterDestinationName =
255 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
256
257 Destination searchWriterDestination = messageBus.getDestination(
258 searchWriterDestinationName);
259
260 if (searchWriterDestination == null) {
261 searchWriterDestination = createSearchWriterDestination(
262 searchWriterDestinationName);
263
264 searchWriterDestination.open();
265
266 messageBus.addDestination(searchWriterDestination);
267 }
268
269 return searchWriterDestination;
270 }
271
272 protected void initSearchEngine(
273 String searchEngineId, SearchEngine searchEngine) {
274
275 SearchEngineRegistration searchEngineRegistration =
276 new SearchEngineRegistration(searchEngineId);
277
278 _searchEngineRegistrations.add(searchEngineRegistration);
279
280 MessageBus messageBus = getMessageBus();
281
282 Destination searchReaderDestination = getSearchReaderDestination(
283 messageBus, searchEngineId, searchEngine);
284
285 searchEngineRegistration.setSearchReaderDestinationName(
286 searchReaderDestination.getName());
287
288 Destination searchWriterDestination = getSearchWriterDestination(
289 messageBus, searchEngineId, searchEngine);
290
291 searchEngineRegistration.setSearchWriterDestinationName(
292 searchWriterDestination.getName());
293
294 SearchEngine originalSearchEngine =
295 SearchEngineUtil.getSearchEngineSilent(searchEngineId);
296
297 if (originalSearchEngine != null) {
298 searchEngineRegistration.setOverride(true);
299
300 searchEngineRegistration.setOriginalSearchEngineProxyWrapper(
301 (SearchEngineProxyWrapper)originalSearchEngine);
302
303 savePreviousSearchEngineListeners(
304 searchReaderDestination, searchWriterDestination,
305 searchEngineRegistration);
306
307 messageBus.removeDestination(searchReaderDestination.getName());
308
309 searchReaderDestination = getSearchReaderDestination(
310 messageBus, searchEngineId, originalSearchEngine);
311
312 messageBus.removeDestination(searchWriterDestination.getName());
313
314 searchWriterDestination = getSearchWriterDestination(
315 messageBus, searchEngineId, originalSearchEngine);
316 }
317
318 createSearchEngineListeners(
319 searchEngineId, searchEngine, searchReaderDestination,
320 searchWriterDestination);
321
322 SearchEngineProxyWrapper searchEngineProxyWrapper =
323 new SearchEngineProxyWrapper(
324 searchEngine, getIndexSearcher(), getIndexWriter());
325
326 SearchEngineUtil.setSearchEngine(
327 searchEngineId, searchEngineProxyWrapper);
328 }
329
330 protected void registerInvokerMessageListener(
331 Destination destination,
332 List<InvokerMessageListener> invokerMessageListeners) {
333
334 for (InvokerMessageListener invokerMessageListener :
335 invokerMessageListeners) {
336
337 destination.register(
338 invokerMessageListener.getMessageListener(),
339 invokerMessageListener.getClassLoader());
340 }
341 }
342
343 protected void registerSearchEngineMessageListener(
344 String searchEngineId, SearchEngine searchEngine,
345 Destination destination,
346 BaseSearchEngineMessageListener baseSearchEngineMessageListener,
347 Object manager) {
348
349 baseSearchEngineMessageListener.setManager(manager);
350 baseSearchEngineMessageListener.setMessageBus(getMessageBus());
351 baseSearchEngineMessageListener.setSearchEngine(searchEngine);
352 baseSearchEngineMessageListener.setSearchEngineId(searchEngineId);
353
354 destination.register(
355 baseSearchEngineMessageListener, getOperatingClassloader());
356 }
357
358 protected void savePreviousSearchEngineListeners(
359 Destination searchReaderDestination,
360 Destination searchWriterDestination,
361 SearchEngineRegistration searchEngineRegistration) {
362
363 Set<MessageListener> searchReaderMessageListeners =
364 searchReaderDestination.getMessageListeners();
365
366 for (MessageListener searchReaderMessageListener :
367 searchReaderMessageListeners) {
368
369 InvokerMessageListener invokerMessageListener =
370 (InvokerMessageListener)searchReaderMessageListener;
371
372 searchEngineRegistration.addOriginalSearchReaderMessageListener(
373 invokerMessageListener);
374 }
375
376 Set<MessageListener> searchWriterMessageListeners =
377 searchWriterDestination.getMessageListeners();
378
379 for (MessageListener searchWriterMessageListener :
380 searchWriterMessageListeners) {
381
382 InvokerMessageListener invokerMessageListener =
383 (InvokerMessageListener)searchWriterMessageListener;
384
385 searchEngineRegistration.addOriginalSearchWriterMessageListener(
386 invokerMessageListener);
387 }
388 }
389
390 private static final int _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE =
391 GetterUtil.getInteger(
392 PropsUtil.get(PropsKeys.INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE));
393
394 private static final Log _log = LogFactoryUtil.getLog(
395 AbstractSearchEngineConfigurator.class);
396
397 private String _originalSearchEngineId;
398 private final List<SearchEngineRegistration> _searchEngineRegistrations =
399 new ArrayList<SearchEngineRegistration>();
400 private Map<String, SearchEngine> _searchEngines;
401
402 private class SearchEngineRegistration {
403
404 public void addOriginalSearchReaderMessageListener(
405 InvokerMessageListener messageListener) {
406
407 _originalSearchReaderMessageListeners.add(messageListener);
408 }
409
410 public void addOriginalSearchWriterMessageListener(
411 InvokerMessageListener messageListener) {
412
413 _originalSearchWriterMessageListeners.add(messageListener);
414 }
415
416 public SearchEngineProxyWrapper getOriginalSearchEngineProxyWrapper() {
417 return _originalSearchEngineProxyWrapper;
418 }
419
420 public List<InvokerMessageListener>
421 getOriginalSearchReaderMessageListeners() {
422
423 return _originalSearchReaderMessageListeners;
424 }
425
426 public List<InvokerMessageListener>
427 getOriginalSearchWriterMessageListeners() {
428
429 return _originalSearchWriterMessageListeners;
430 }
431
432 public String getSearchEngineId() {
433 return _searchEngineId;
434 }
435
436 public String getSearchReaderDestinationName() {
437 return _searchReaderDestinationName;
438 }
439
440 public String getSearchWriterDestinationName() {
441 return _searchWriterDestinationName;
442 }
443
444 public boolean isOverride() {
445 return _override;
446 }
447
448 public void setOriginalSearchEngineProxyWrapper(
449 SearchEngineProxyWrapper searchEngineProxyWrapper) {
450
451 _originalSearchEngineProxyWrapper = searchEngineProxyWrapper;
452 }
453
454 public void setOverride(boolean override) {
455 _override = override;
456 }
457
458 public void setSearchReaderDestinationName(
459 String searchReaderDestinationName) {
460
461 _searchReaderDestinationName = searchReaderDestinationName;
462 }
463
464 public void setSearchWriterDestinationName(
465 String searchWriterDestinationName) {
466
467 _searchWriterDestinationName = searchWriterDestinationName;
468 }
469
470 private SearchEngineRegistration(String searchEngineId) {
471 _searchEngineId = searchEngineId;
472 }
473
474 private SearchEngineProxyWrapper _originalSearchEngineProxyWrapper;
475 private final List<InvokerMessageListener>
476 _originalSearchReaderMessageListeners =
477 new ArrayList<InvokerMessageListener>();
478 private final List<InvokerMessageListener>
479 _originalSearchWriterMessageListeners =
480 new ArrayList<InvokerMessageListener>();
481 private boolean _override;
482 private final String _searchEngineId;
483 private String _searchReaderDestinationName;
484 private String _searchWriterDestinationName;
485
486 }
487
488 }