001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterLink;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028 import com.liferay.portal.kernel.exception.SystemException;
029 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032 import com.liferay.portal.kernel.log.Log;
033 import com.liferay.portal.kernel.log.LogFactoryUtil;
034 import com.liferay.portal.kernel.messaging.Destination;
035 import com.liferay.portal.kernel.messaging.MessageBus;
036 import com.liferay.portal.kernel.messaging.MessageBusUtil;
037 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038 import com.liferay.portal.kernel.search.BooleanClauseOccur;
039 import com.liferay.portal.kernel.search.Field;
040 import com.liferay.portal.kernel.search.SearchEngineUtil;
041 import com.liferay.portal.kernel.util.ArrayUtil;
042 import com.liferay.portal.kernel.util.BasePortalLifecycle;
043 import com.liferay.portal.kernel.util.GetterUtil;
044 import com.liferay.portal.kernel.util.MethodHandler;
045 import com.liferay.portal.kernel.util.MethodKey;
046 import com.liferay.portal.kernel.util.ObjectValuePair;
047 import com.liferay.portal.kernel.util.PortalLifecycle;
048 import com.liferay.portal.kernel.util.PropsKeys;
049 import com.liferay.portal.kernel.util.StringBundler;
050 import com.liferay.portal.kernel.util.StringPool;
051 import com.liferay.portal.kernel.util.StringUtil;
052 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053 import com.liferay.portal.kernel.util.Validator;
054 import com.liferay.portal.model.CompanyConstants;
055 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
056 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
057 import com.liferay.portal.security.auth.TransientTokenUtil;
058 import com.liferay.portal.util.PortalInstances;
059 import com.liferay.portal.util.PortalUtil;
060 import com.liferay.portal.util.PropsUtil;
061 import com.liferay.portal.util.PropsValues;
062 import com.liferay.util.lucene.KeywordsUtil;
063
064 import java.io.IOException;
065 import java.io.InputStream;
066 import java.io.OutputStream;
067
068 import java.net.InetAddress;
069 import java.net.URL;
070 import java.net.URLConnection;
071
072 import java.util.HashSet;
073 import java.util.List;
074 import java.util.Map;
075 import java.util.Set;
076 import java.util.concurrent.BlockingQueue;
077 import java.util.concurrent.ConcurrentHashMap;
078 import java.util.concurrent.CountDownLatch;
079 import java.util.concurrent.TimeUnit;
080
081 import org.apache.commons.lang.time.StopWatch;
082 import org.apache.lucene.analysis.Analyzer;
083 import org.apache.lucene.analysis.TokenStream;
084 import org.apache.lucene.document.Document;
085 import org.apache.lucene.index.IndexReader;
086 import org.apache.lucene.index.Term;
087 import org.apache.lucene.queryParser.ParseException;
088 import org.apache.lucene.queryParser.QueryParser;
089 import org.apache.lucene.search.BooleanClause;
090 import org.apache.lucene.search.BooleanQuery;
091 import org.apache.lucene.search.IndexSearcher;
092 import org.apache.lucene.search.NumericRangeQuery;
093 import org.apache.lucene.search.Query;
094 import org.apache.lucene.search.TermQuery;
095 import org.apache.lucene.search.TermRangeQuery;
096 import org.apache.lucene.search.WildcardQuery;
097 import org.apache.lucene.search.highlight.Formatter;
098 import org.apache.lucene.search.highlight.Highlighter;
099 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
100 import org.apache.lucene.search.highlight.QueryScorer;
101 import org.apache.lucene.search.highlight.SimpleFragmenter;
102 import org.apache.lucene.search.highlight.WeightedTerm;
103 import org.apache.lucene.util.Version;
104
105
114 public class LuceneHelperImpl implements LuceneHelper {
115
116 @Override
117 public void addDocument(long companyId, Document document)
118 throws IOException {
119
120 IndexAccessor indexAccessor = getIndexAccessor(companyId);
121
122 indexAccessor.addDocument(document);
123 }
124
125 @Override
126 public void addExactTerm(
127 BooleanQuery booleanQuery, String field, String value) {
128
129 addTerm(booleanQuery, field, value, false);
130 }
131
132 @Override
133 public void addNumericRangeTerm(
134 BooleanQuery booleanQuery, String field, Integer startValue,
135 Integer endValue) {
136
137 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
138 field, startValue, endValue, true, true);
139
140 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
141 }
142
143 @Override
144 public void addNumericRangeTerm(
145 BooleanQuery booleanQuery, String field, Long startValue,
146 Long endValue) {
147
148 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
149 field, startValue, endValue, true, true);
150
151 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
152 }
153
154
158 @Override
159 public void addNumericRangeTerm(
160 BooleanQuery booleanQuery, String field, String startValue,
161 String endValue) {
162
163 addNumericRangeTerm(
164 booleanQuery, field, GetterUtil.getLong(startValue),
165 GetterUtil.getLong(endValue));
166 }
167
168 @Override
169 public void addRangeTerm(
170 BooleanQuery booleanQuery, String field, String startValue,
171 String endValue) {
172
173 boolean includesLower = true;
174
175 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
176 includesLower = false;
177 }
178
179 boolean includesUpper = true;
180
181 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
182 includesUpper = false;
183 }
184
185 TermRangeQuery termRangeQuery = new TermRangeQuery(
186 field, startValue, endValue, includesLower, includesUpper);
187
188 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
189 }
190
191 @Override
192 public void addRequiredTerm(
193 BooleanQuery booleanQuery, String field, String value, boolean like) {
194
195 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
196 }
197
198 @Override
199 public void addRequiredTerm(
200 BooleanQuery booleanQuery, String field, String[] values,
201 boolean like) {
202
203 if (values == null) {
204 return;
205 }
206
207 BooleanQuery query = new BooleanQuery();
208
209 for (String value : values) {
210 addTerm(query, field, value, like);
211 }
212
213 booleanQuery.add(query, BooleanClause.Occur.MUST);
214 }
215
216 @Override
217 public void addTerm(
218 BooleanQuery booleanQuery, String field, String value, boolean like) {
219
220 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
221 }
222
223 @Override
224 public void addTerm(
225 BooleanQuery booleanQuery, String field, String value, boolean like,
226 BooleanClauseOccur booleanClauseOccur) {
227
228 if (Validator.isNull(value)) {
229 return;
230 }
231
232 Analyzer analyzer = getAnalyzer();
233
234 if (analyzer instanceof PerFieldAnalyzer) {
235 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
236
237 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
238
239 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
240 like = true;
241 }
242 }
243
244 if (like) {
245 value = StringUtil.replace(
246 value, StringPool.PERCENT, StringPool.BLANK);
247 }
248
249 try {
250 QueryParser queryParser = new QueryParser(
251 getVersion(), field, analyzer);
252
253 value = StringUtil.replace(
254 value, _KEYWORDS_LOWERCASE, _KEYWORDS_UPPERCASE);
255
256 Query query = parseQuery(queryParser, value);
257
258 BooleanClause.Occur occur = null;
259
260 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
261 occur = BooleanClause.Occur.MUST;
262 }
263 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
264 occur = BooleanClause.Occur.MUST_NOT;
265 }
266 else {
267 occur = BooleanClause.Occur.SHOULD;
268 }
269
270 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
271 }
272 catch (BooleanQuery.TooManyClauses tmc) {
273 _log.error(
274 "The value in the portal property \"" +
275 PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE +
276 "\" is too small",
277 tmc);
278 }
279 catch (Exception e) {
280 if (_log.isWarnEnabled()) {
281 _log.warn(e, e);
282 }
283 }
284 }
285
286 @Override
287 public void addTerm(
288 BooleanQuery booleanQuery, String field, String[] values,
289 boolean like) {
290
291 for (String value : values) {
292 addTerm(booleanQuery, field, value, like);
293 }
294 }
295
296
300 @Deprecated
301 @Override
302 public void cleanUp(IndexSearcher indexSearcher) {
303 if (indexSearcher == null) {
304 return;
305 }
306
307 try {
308 indexSearcher.close();
309
310 IndexReader indexReader = indexSearcher.getIndexReader();
311
312 if (indexReader != null) {
313 indexReader.close();
314 }
315 }
316 catch (IOException ioe) {
317 _log.error(ioe, ioe);
318 }
319 }
320
321 @Override
322 public int countScoredFieldNames(Query query, String[] filedNames) {
323 int count = 0;
324
325 for (String fieldName : filedNames) {
326 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
327 query, false, fieldName);
328
329 if ((weightedTerms.length > 0) &&
330 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
331
332 count++;
333 }
334 }
335
336 return count;
337 }
338
339 @Override
340 public void delete(long companyId) {
341 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
342
343 if (indexAccessor == null) {
344 return;
345 }
346
347 indexAccessor.delete();
348 }
349
350 @Override
351 public void deleteDocuments(long companyId, Term term) throws IOException {
352 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
353
354 if (indexAccessor == null) {
355 return;
356 }
357
358 indexAccessor.deleteDocuments(term);
359 }
360
361 @Override
362 public void dumpIndex(long companyId, OutputStream outputStream)
363 throws IOException {
364
365 long lastGeneration = getLastGeneration(companyId);
366
367 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
368 if (_log.isDebugEnabled()) {
369 _log.debug(
370 "Dump index from cluster is not enabled for " + companyId);
371 }
372
373 return;
374 }
375
376 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
377
378 if (indexAccessor == null) {
379 return;
380 }
381
382 indexAccessor.dumpIndex(outputStream);
383 }
384
385 @Override
386 public Analyzer getAnalyzer() {
387 return _analyzer;
388 }
389
390 @Override
391 public IndexAccessor getIndexAccessor(long companyId) {
392 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
393
394 if (indexAccessor != null) {
395 return indexAccessor;
396 }
397
398 synchronized (this) {
399 indexAccessor = _indexAccessors.get(companyId);
400
401 if (indexAccessor == null) {
402 indexAccessor = new IndexAccessorImpl(companyId);
403
404 if (isLoadIndexFromClusterEnabled()) {
405 indexAccessor = new SynchronizedIndexAccessorImpl(
406 indexAccessor);
407
408 boolean clusterForwardMessage = GetterUtil.getBoolean(
409 MessageValuesThreadLocal.getValue(
410 ClusterLink.CLUSTER_FORWARD_MESSAGE));
411
412 if (clusterForwardMessage) {
413 if (_log.isInfoEnabled()) {
414 _log.info(
415 "Skip Luncene index files cluster loading " +
416 "since this is a manual reindex request");
417 }
418 }
419 else {
420 try {
421 _loadIndexFromCluster(
422 indexAccessor,
423 indexAccessor.getLastGeneration());
424 }
425 catch (Exception e) {
426 _log.error(
427 "Unable to load index for company " +
428 indexAccessor.getCompanyId(),
429 e);
430 }
431 }
432 }
433
434 _indexAccessors.put(companyId, indexAccessor);
435 }
436 }
437
438 return indexAccessor;
439 }
440
441 @Override
442 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
443 IndexAccessor indexAccessor = getIndexAccessor(companyId);
444
445 return indexAccessor.acquireIndexSearcher();
446 }
447
448 @Override
449 public long getLastGeneration(long companyId) {
450 if (!isLoadIndexFromClusterEnabled()) {
451 return IndexAccessor.DEFAULT_LAST_GENERATION;
452 }
453
454 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
455
456 if (indexAccessor == null) {
457 return IndexAccessor.DEFAULT_LAST_GENERATION;
458 }
459
460 return indexAccessor.getLastGeneration();
461 }
462
463 @Override
464 public InputStream getLoadIndexesInputStreamFromCluster(
465 long companyId, Address bootupAddress)
466 throws SystemException {
467
468 if (!isLoadIndexFromClusterEnabled()) {
469 return null;
470 }
471
472 InputStream inputStream = null;
473
474 try {
475 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
476 _getBootupClusterNodeObjectValuePair(bootupAddress);
477
478 URL url = bootupClusterNodeObjectValuePair.getValue();
479
480 URLConnection urlConnection = url.openConnection();
481
482 urlConnection.setDoOutput(true);
483
484 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
485 urlConnection.getOutputStream());
486
487 unsyncPrintWriter.write("transientToken=");
488 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
489 unsyncPrintWriter.write("&companyId=");
490 unsyncPrintWriter.write(String.valueOf(companyId));
491
492 unsyncPrintWriter.close();
493
494 inputStream = urlConnection.getInputStream();
495
496 return inputStream;
497 }
498 catch (IOException ioe) {
499 throw new SystemException(ioe);
500 }
501 }
502
503 @Override
504 public Set<String> getQueryTerms(Query query) {
505 String queryString = StringUtil.replace(
506 query.toString(), StringPool.STAR, StringPool.BLANK);
507
508 Query tempQuery = null;
509
510 try {
511 QueryParser queryParser = new QueryParser(
512 getVersion(), StringPool.BLANK, getAnalyzer());
513
514 tempQuery = parseQuery(queryParser, queryString);
515 }
516 catch (Exception e) {
517 if (_log.isWarnEnabled()) {
518 _log.warn("Unable to parse " + queryString);
519 }
520
521 tempQuery = query;
522 }
523
524 WeightedTerm[] weightedTerms = null;
525
526 for (String fieldName : Field.KEYWORDS) {
527 weightedTerms = QueryTermExtractor.getTerms(
528 tempQuery, false, fieldName);
529
530 if (weightedTerms.length > 0) {
531 break;
532 }
533 }
534
535 Set<String> queryTerms = new HashSet<String>();
536
537 for (WeightedTerm weightedTerm : weightedTerms) {
538 queryTerms.add(weightedTerm.getTerm());
539 }
540
541 return queryTerms;
542 }
543
544
547 @Deprecated
548 @Override
549 public IndexSearcher getSearcher(long companyId, boolean readOnly)
550 throws IOException {
551
552 IndexAccessor indexAccessor = getIndexAccessor(companyId);
553
554 IndexReader indexReader = IndexReader.open(
555 indexAccessor.getLuceneDir(), readOnly);
556
557 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
558
559 indexSearcher.setDefaultFieldSortScoring(true, false);
560 indexSearcher.setSimilarity(new FieldWeightSimilarity());
561
562 return indexSearcher;
563 }
564
565 @Override
566 public String getSnippet(
567 Query query, String field, String s, int maxNumFragments,
568 int fragmentLength, String fragmentSuffix, Formatter formatter)
569 throws IOException {
570
571 QueryScorer queryScorer = new QueryScorer(query, field);
572
573 Highlighter highlighter = new Highlighter(formatter, queryScorer);
574
575 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
576
577 TokenStream tokenStream = getAnalyzer().tokenStream(
578 field, new UnsyncStringReader(s));
579
580 try {
581 String snippet = highlighter.getBestFragments(
582 tokenStream, s, maxNumFragments, fragmentSuffix);
583
584 if (Validator.isNotNull(snippet) &&
585 !StringUtil.endsWith(snippet, fragmentSuffix) &&
586 !s.equals(snippet)) {
587
588 snippet = snippet.concat(fragmentSuffix);
589 }
590
591 return snippet;
592 }
593 catch (InvalidTokenOffsetsException itoe) {
594 throw new IOException(itoe.getMessage());
595 }
596 }
597
598 @Override
599 public Version getVersion() {
600 return _version;
601 }
602
603 @Override
604 public boolean isLoadIndexFromClusterEnabled() {
605 if (PropsValues.CLUSTER_LINK_ENABLED &&
606 PropsValues.LUCENE_REPLICATE_WRITE) {
607
608 return true;
609 }
610
611 if (_log.isDebugEnabled()) {
612 _log.debug("Load index from cluster is not enabled");
613 }
614
615 return false;
616 }
617
618 @Override
619 public void loadIndex(long companyId, InputStream inputStream)
620 throws IOException {
621
622 if (!isLoadIndexFromClusterEnabled()) {
623 return;
624 }
625
626 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
627
628 if (indexAccessor == null) {
629 if (_log.isInfoEnabled()) {
630 _log.info(
631 "Skip loading Lucene index files for company " + companyId +
632 " in favor of lazy loading");
633 }
634
635 return;
636 }
637
638 StopWatch stopWatch = new StopWatch();
639
640 stopWatch.start();
641
642 if (_log.isInfoEnabled()) {
643 _log.info(
644 "Start loading Lucene index files for company " + companyId);
645 }
646
647 indexAccessor.loadIndex(inputStream);
648
649 if (_log.isInfoEnabled()) {
650 _log.info(
651 "Finished loading index files for company " + companyId +
652 " in " + stopWatch.getTime() + " ms");
653 }
654 }
655
656 @Override
657 public void loadIndexesFromCluster(long companyId) throws SystemException {
658 if (!isLoadIndexFromClusterEnabled()) {
659 return;
660 }
661
662 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
663
664 if (indexAccessor == null) {
665 return;
666 }
667
668 long localLastGeneration = getLastGeneration(companyId);
669
670 _loadIndexFromCluster(indexAccessor, localLastGeneration);
671 }
672
673 @Override
674 public void releaseIndexSearcher(
675 long companyId, IndexSearcher indexSearcher)
676 throws IOException {
677
678 IndexAccessor indexAccessor = getIndexAccessor(companyId);
679
680 indexAccessor.releaseIndexSearcher(indexSearcher);
681 }
682
683 public void setAnalyzer(Analyzer analyzer) {
684 _analyzer = analyzer;
685 }
686
687 public void setVersion(Version version) {
688 _version = version;
689 }
690
691 @Override
692 public void shutdown() {
693 if (_luceneIndexThreadPoolExecutor != null) {
694 _luceneIndexThreadPoolExecutor.shutdownNow();
695
696 try {
697 _luceneIndexThreadPoolExecutor.awaitTermination(
698 60, TimeUnit.SECONDS);
699 }
700 catch (InterruptedException ie) {
701 _log.error("Lucene indexer shutdown interrupted", ie);
702 }
703 }
704
705 if (isLoadIndexFromClusterEnabled()) {
706 ClusterExecutorUtil.removeClusterEventListener(
707 _loadIndexClusterEventListener);
708 }
709
710 MessageBus messageBus = MessageBusUtil.getMessageBus();
711
712 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
713 String searchWriterDestinationName =
714 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
715
716 Destination searchWriteDestination = messageBus.getDestination(
717 searchWriterDestinationName);
718
719 if (searchWriteDestination != null) {
720 ThreadPoolExecutor threadPoolExecutor =
721 PortalExecutorManagerUtil.getPortalExecutor(
722 searchWriterDestinationName);
723
724 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
725
726 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
727
728 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
729 countDownLatch);
730
731 for (int i = 0; i < maxPoolSize; i++) {
732 threadPoolExecutor.submit(shutdownSyncJob);
733 }
734
735 try {
736 countDownLatch.await();
737 }
738 catch (InterruptedException ie) {
739 _log.error("Shutdown waiting interrupted", ie);
740 }
741
742 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
743
744 if (_log.isDebugEnabled()) {
745 _log.debug(
746 "Cancelled appending indexing jobs: " + runnables);
747 }
748
749 searchWriteDestination.close(true);
750 }
751 }
752
753 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
754 indexAccessor.close();
755 }
756 }
757
758 @Override
759 public void shutdown(long companyId) {
760 IndexAccessor indexAccessor = getIndexAccessor(companyId);
761
762 _indexAccessors.remove(indexAccessor);
763
764 indexAccessor.close();
765 }
766
767 @Override
768 public void startup(long companyId) {
769 if (!PropsValues.INDEX_ON_STARTUP) {
770 if (PropsValues.LUCENE_CLUSTER_INDEX_LOADING_ON_STARTUP) {
771 getIndexAccessor(companyId);
772 }
773
774 return;
775 }
776
777 if (_log.isInfoEnabled()) {
778 _log.info("Indexing Lucene on startup");
779 }
780
781 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
782
783 if (PropsValues.INDEX_WITH_THREAD) {
784 if (_luceneIndexThreadPoolExecutor == null) {
785
786
787
788
789
790 _luceneIndexThreadPoolExecutor =
791 PortalExecutorManagerUtil.getPortalExecutor(
792 LuceneHelperImpl.class.getName());
793 }
794
795 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
796 }
797 else {
798 luceneIndexer.reindex();
799 }
800 }
801
802 @Override
803 public void updateDocument(long companyId, Term term, Document document)
804 throws IOException {
805
806 IndexAccessor indexAccessor = getIndexAccessor(companyId);
807
808 indexAccessor.updateDocument(term, document);
809 }
810
811 protected Query parseQuery(QueryParser queryParser, String queryString)
812 throws ParseException {
813
814 try {
815 return queryParser.parse(queryString);
816 }
817 catch (ParseException e) {
818 return queryParser.parse(KeywordsUtil.escape(queryString));
819 }
820 }
821
822 private LuceneHelperImpl() {
823 if ((PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) ||
824 isLoadIndexFromClusterEnabled()) {
825
826 _luceneIndexThreadPoolExecutor =
827 PortalExecutorManagerUtil.getPortalExecutor(
828 LuceneHelperImpl.class.getName());
829 }
830
831 if (isLoadIndexFromClusterEnabled()) {
832 _loadIndexClusterEventListener =
833 new LoadIndexClusterEventListener();
834
835 ClusterExecutorUtil.addClusterEventListener(
836 _loadIndexClusterEventListener);
837
838 CompanyInitalizedListener companyInitalizedListener =
839 new CompanyInitalizedListener();
840
841 companyInitalizedListener.registerPortalLifecycle(
842 PortalLifecycle.METHOD_INIT);
843 }
844
845 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
846 }
847
848 private ObjectValuePair<String, URL>
849 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
850 throws SystemException {
851
852 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
853 new MethodHandler(
854 _createTokenMethodKey,
855 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
856 bootupAddress);
857
858 FutureClusterResponses futureClusterResponses =
859 ClusterExecutorUtil.execute(clusterRequest);
860
861 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
862 futureClusterResponses.getPartialResults();
863
864 try {
865 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
866 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
867 TimeUnit.MILLISECONDS);
868
869 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
870
871 int port = clusterNode.getPort();
872
873 if (port <= 0) {
874 StringBundler sb = new StringBundler(6);
875
876 sb.append("Invalid cluster node port ");
877 sb.append(port);
878 sb.append(". The port is set by the first request or ");
879 sb.append("configured in portal.properties by the properties ");
880 sb.append("\"portal.instance.http.port\" and ");
881 sb.append("\"portal.instance.https.port\".");
882
883 throw new Exception(sb.toString());
884 }
885
886 String protocol = clusterNode.getPortalProtocol();
887
888 if (Validator.isNull(protocol)) {
889 StringBundler sb = new StringBundler(4);
890
891 sb.append("Cluster node protocol is empty. The protocol is ");
892 sb.append("set by the first request or configured in ");
893 sb.append("portal.properties by the property ");
894 sb.append("\"portal.instance.protocol\"");
895
896 throw new Exception(sb.toString());
897 }
898
899 InetAddress inetAddress = clusterNode.getInetAddress();
900
901 String hostName = null;
902
903 if (PropsValues.LUCENE_CLUSTER_INDEX_USE_CANONICAL_HOST_NAME) {
904 hostName = inetAddress.getCanonicalHostName();
905 }
906 else {
907 hostName = inetAddress.getHostAddress();
908 }
909
910 String fileName = PortalUtil.getPathContext();
911
912 if (!fileName.endsWith(StringPool.SLASH)) {
913 fileName = fileName.concat(StringPool.SLASH);
914 }
915
916 fileName = fileName.concat("lucene/dump");
917
918 URL url = new URL(protocol, hostName, port, fileName);
919
920 String transientToken = (String)clusterNodeResponse.getResult();
921
922 return new ObjectValuePair<String, URL>(transientToken, url);
923 }
924 catch (Exception e) {
925 throw new SystemException(e);
926 }
927 }
928
929 private void _handleFutureClusterResponses(
930 FutureClusterResponses futureClusterResponses,
931 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
932 long localLastGeneration) {
933
934 BlockingQueue<ClusterNodeResponse> blockingQueue =
935 futureClusterResponses.getPartialResults();
936
937 long companyId = indexAccessor.getCompanyId();
938
939 Address bootupAddress = null;
940
941 do {
942 clusterNodeAddressesCount--;
943
944 ClusterNodeResponse clusterNodeResponse = null;
945
946 try {
947 clusterNodeResponse = blockingQueue.poll(
948 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
949 TimeUnit.MILLISECONDS);
950 }
951 catch (Exception e) {
952 _log.error("Unable to get cluster node response", e);
953 }
954
955 if (clusterNodeResponse == null) {
956 if (_log.isDebugEnabled()) {
957 _log.debug(
958 "Unable to get cluster node response in " +
959 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
960 TimeUnit.MILLISECONDS);
961 }
962
963 continue;
964 }
965
966 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
967
968 if (clusterNode.getPort() > 0) {
969 try {
970 long remoteLastGeneration =
971 (Long)clusterNodeResponse.getResult();
972
973 if (remoteLastGeneration > localLastGeneration) {
974 bootupAddress = clusterNodeResponse.getAddress();
975
976 break;
977 }
978 }
979 catch (Exception e) {
980 if (_log.isDebugEnabled()) {
981 _log.debug(
982 "Suppress exception caused by remote method " +
983 "invocation",
984 e);
985 }
986
987 continue;
988 }
989 }
990 else {
991 if (_log.isDebugEnabled()) {
992 _log.debug(
993 "Cluster node " + clusterNode +
994 " has invalid port");
995 }
996 }
997 }
998 while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
999
1000 if (bootupAddress == null) {
1001 return;
1002 }
1003
1004 if (_log.isInfoEnabled()) {
1005 _log.info(
1006 "Start loading lucene index files from cluster node " +
1007 bootupAddress);
1008 }
1009
1010 InputStream inputStream = null;
1011
1012 try {
1013 inputStream = getLoadIndexesInputStreamFromCluster(
1014 companyId, bootupAddress);
1015
1016 indexAccessor.loadIndex(inputStream);
1017
1018 if (_log.isInfoEnabled()) {
1019 _log.info("Lucene index files loaded successfully");
1020 }
1021 }
1022 catch (Exception e) {
1023 _log.error("Unable to load index for company " + companyId, e);
1024 }
1025 finally {
1026 if (inputStream != null) {
1027 try {
1028 inputStream.close();
1029 }
1030 catch (IOException ioe) {
1031 _log.error(
1032 "Unable to close input stream for company " +
1033 companyId,
1034 ioe);
1035 }
1036 }
1037 }
1038 }
1039
1040 private void _includeIfUnique(
1041 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
1042 Query query, BooleanClause.Occur occur) {
1043
1044 if (query instanceof TermQuery) {
1045 Set<Term> terms = new HashSet<Term>();
1046
1047 TermQuery termQuery = (TermQuery)query;
1048
1049 termQuery.extractTerms(terms);
1050
1051 float boost = termQuery.getBoost();
1052
1053 for (Term term : terms) {
1054 String termValue = term.text();
1055
1056 if (like) {
1057 termValue = termValue.toLowerCase(queryParser.getLocale());
1058
1059 termValue = termValue.concat(StringPool.STAR);
1060
1061 if (PropsValues.INDEX_SEARCH_LEADING_WILDCARD_ENABLED) {
1062 termValue = StringPool.STAR.concat(termValue);
1063 }
1064
1065 term = term.createTerm(termValue);
1066
1067 query = new WildcardQuery(term);
1068 }
1069 else {
1070 query = new TermQuery(term);
1071 }
1072
1073 query.setBoost(boost);
1074
1075 boolean included = false;
1076
1077 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1078 if (query.equals(booleanClause.getQuery())) {
1079 included = true;
1080 }
1081 }
1082
1083 if (!included) {
1084 booleanQuery.add(query, occur);
1085 }
1086 }
1087 }
1088 else if (query instanceof BooleanQuery) {
1089 BooleanQuery curBooleanQuery = (BooleanQuery)query;
1090
1091 BooleanQuery containerBooleanQuery = new BooleanQuery();
1092
1093 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1094 _includeIfUnique(
1095 containerBooleanQuery, like, queryParser,
1096 booleanClause.getQuery(), booleanClause.getOccur());
1097 }
1098
1099 if (containerBooleanQuery.getClauses().length > 0) {
1100 booleanQuery.add(containerBooleanQuery, occur);
1101 }
1102 }
1103 else {
1104 boolean included = false;
1105
1106 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1107 if (query.equals(booleanClause.getQuery())) {
1108 included = true;
1109 }
1110 }
1111
1112 if (!included) {
1113 booleanQuery.add(query, occur);
1114 }
1115 }
1116 }
1117
1118 private void _loadIndexFromCluster(
1119 IndexAccessor indexAccessor, long localLastGeneration)
1120 throws SystemException {
1121
1122 List<Address> clusterNodeAddresses =
1123 ClusterExecutorUtil.getClusterNodeAddresses();
1124
1125 int clusterNodeAddressesCount = clusterNodeAddresses.size();
1126
1127 if (clusterNodeAddressesCount <= 1) {
1128 if (_log.isDebugEnabled()) {
1129 _log.debug(
1130 "Do not load indexes because there is either one portal " +
1131 "instance or no portal instances in the cluster");
1132 }
1133
1134 return;
1135 }
1136
1137 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1138 new MethodHandler(
1139 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1140 true);
1141
1142 FutureClusterResponses futureClusterResponses =
1143 ClusterExecutorUtil.execute(clusterRequest);
1144
1145 _handleFutureClusterResponses(
1146 futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1147 localLastGeneration);
1148 }
1149
1150 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1151 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1152
1153 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1154 GetterUtil.getInteger(
1155 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1156 BooleanQuery.getMaxClauseCount());
1157
1158 private static final String[] _KEYWORDS_LOWERCASE = {
1159 " and ", " not ", " or "
1160 };
1161
1162 private static final String[] _KEYWORDS_UPPERCASE = {
1163 " AND ", " NOT ", " OR "
1164 };
1165
1166 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1167
1168 private static MethodKey _createTokenMethodKey = new MethodKey(
1169 TransientTokenUtil.class, "createToken", long.class);
1170 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1171 LuceneHelperUtil.class, "getLastGeneration", long.class);
1172
1173 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
1174
1175 private Analyzer _analyzer;
1176 private Map<Long, IndexAccessor> _indexAccessors =
1177 new ConcurrentHashMap<Long, IndexAccessor>();
1178 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1179 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1180 private Version _version;
1181
1182 private static class ShutdownSyncJob implements Runnable {
1183
1184 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1185 _countDownLatch = countDownLatch;
1186 }
1187
1188 @Override
1189 public void run() {
1190 _countDownLatch.countDown();
1191
1192 try {
1193 synchronized (this) {
1194 wait();
1195 }
1196 }
1197 catch (InterruptedException ie) {
1198 }
1199 }
1200
1201 private final CountDownLatch _countDownLatch;
1202
1203 }
1204
1205 private class ClusterIndexLoader implements Runnable {
1206
1207 @Override
1208 public void run() {
1209 long lastGeneration = getLastGeneration(_companyId);
1210
1211 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1212 return;
1213 }
1214
1215 try {
1216 LuceneClusterUtil.loadIndexesFromCluster(_companyId);
1217 }
1218 catch (Exception e) {
1219 _log.error(
1220 "Unable to load indexes for company " + _companyId, e);
1221 }
1222 }
1223
1224 private ClusterIndexLoader(long companyId) {
1225 _companyId = companyId;
1226 }
1227
1228 private final long _companyId;
1229
1230 }
1231
1232 private class CompanyInitalizedListener extends BasePortalLifecycle {
1233
1234 @Override
1235 protected void doPortalDestroy() throws Exception {
1236 }
1237
1238 @Override
1239 protected void doPortalInit() throws Exception {
1240 _countDownLatch.countDown();
1241 }
1242
1243 }
1244
1245 private class LoadIndexClusterEventListener
1246 implements ClusterEventListener {
1247
1248 @Override
1249 public void processClusterEvent(ClusterEvent clusterEvent) {
1250 ClusterEventType clusterEventType =
1251 clusterEvent.getClusterEventType();
1252
1253 if (clusterEventType != ClusterEventType.JOIN) {
1254 return;
1255 }
1256
1257 List<Address> clusterNodeAddresses =
1258 ClusterExecutorUtil.getClusterNodeAddresses();
1259 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1260
1261 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1262 if (_log.isDebugEnabled()) {
1263 _log.debug(
1264 "Number of original cluster members is greater than " +
1265 "one");
1266 }
1267
1268 return;
1269 }
1270
1271 try {
1272 _countDownLatch.await();
1273 }
1274 catch (InterruptedException ie) {
1275 _log.error("Latch opened prematurely by interruption");
1276 }
1277
1278 long[] companyIds = PortalInstances.getCompanyIds();
1279
1280 for (long companyId : companyIds) {
1281 _luceneIndexThreadPoolExecutor.execute(
1282 new ClusterIndexLoader(companyId));
1283 }
1284
1285 _luceneIndexThreadPoolExecutor.execute(
1286 new ClusterIndexLoader(CompanyConstants.SYSTEM));
1287 }
1288
1289 }
1290
1291 }