001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterLink;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028 import com.liferay.portal.kernel.exception.SystemException;
029 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032 import com.liferay.portal.kernel.log.Log;
033 import com.liferay.portal.kernel.log.LogFactoryUtil;
034 import com.liferay.portal.kernel.messaging.Destination;
035 import com.liferay.portal.kernel.messaging.MessageBus;
036 import com.liferay.portal.kernel.messaging.MessageBusUtil;
037 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038 import com.liferay.portal.kernel.search.BooleanClauseOccur;
039 import com.liferay.portal.kernel.search.Field;
040 import com.liferay.portal.kernel.search.SearchEngineUtil;
041 import com.liferay.portal.kernel.util.ArrayUtil;
042 import com.liferay.portal.kernel.util.BasePortalLifecycle;
043 import com.liferay.portal.kernel.util.GetterUtil;
044 import com.liferay.portal.kernel.util.MethodHandler;
045 import com.liferay.portal.kernel.util.MethodKey;
046 import com.liferay.portal.kernel.util.ObjectValuePair;
047 import com.liferay.portal.kernel.util.PortalLifecycle;
048 import com.liferay.portal.kernel.util.PropsKeys;
049 import com.liferay.portal.kernel.util.StringBundler;
050 import com.liferay.portal.kernel.util.StringPool;
051 import com.liferay.portal.kernel.util.StringUtil;
052 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053 import com.liferay.portal.kernel.util.Validator;
054 import com.liferay.portal.model.CompanyConstants;
055 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
056 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
057 import com.liferay.portal.security.auth.TransientTokenUtil;
058 import com.liferay.portal.util.PortalInstances;
059 import com.liferay.portal.util.PortalUtil;
060 import com.liferay.portal.util.PropsUtil;
061 import com.liferay.portal.util.PropsValues;
062 import com.liferay.util.lucene.KeywordsUtil;
063
064 import java.io.IOException;
065 import java.io.InputStream;
066 import java.io.OutputStream;
067
068 import java.net.InetAddress;
069 import java.net.URL;
070 import java.net.URLConnection;
071
072 import java.util.HashSet;
073 import java.util.List;
074 import java.util.Map;
075 import java.util.Set;
076 import java.util.concurrent.BlockingQueue;
077 import java.util.concurrent.ConcurrentHashMap;
078 import java.util.concurrent.CountDownLatch;
079 import java.util.concurrent.TimeUnit;
080
081 import org.apache.commons.lang.time.StopWatch;
082 import org.apache.lucene.analysis.Analyzer;
083 import org.apache.lucene.analysis.TokenStream;
084 import org.apache.lucene.document.Document;
085 import org.apache.lucene.index.IndexReader;
086 import org.apache.lucene.index.Term;
087 import org.apache.lucene.queryParser.ParseException;
088 import org.apache.lucene.queryParser.QueryParser;
089 import org.apache.lucene.search.BooleanClause;
090 import org.apache.lucene.search.BooleanQuery;
091 import org.apache.lucene.search.IndexSearcher;
092 import org.apache.lucene.search.NumericRangeQuery;
093 import org.apache.lucene.search.Query;
094 import org.apache.lucene.search.TermQuery;
095 import org.apache.lucene.search.TermRangeQuery;
096 import org.apache.lucene.search.WildcardQuery;
097 import org.apache.lucene.search.highlight.Formatter;
098 import org.apache.lucene.search.highlight.Highlighter;
099 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
100 import org.apache.lucene.search.highlight.QueryScorer;
101 import org.apache.lucene.search.highlight.SimpleFragmenter;
102 import org.apache.lucene.search.highlight.WeightedTerm;
103 import org.apache.lucene.util.Version;
104
105
114 public class LuceneHelperImpl implements LuceneHelper {
115
116 @Override
117 public void addDocument(long companyId, Document document)
118 throws IOException {
119
120 IndexAccessor indexAccessor = getIndexAccessor(companyId);
121
122 indexAccessor.addDocument(document);
123 }
124
125 @Override
126 public void addExactTerm(
127 BooleanQuery booleanQuery, String field, String value) {
128
129 addTerm(booleanQuery, field, value, false);
130 }
131
132 @Override
133 public void addNumericRangeTerm(
134 BooleanQuery booleanQuery, String field, Integer startValue,
135 Integer endValue) {
136
137 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
138 field, startValue, endValue, true, true);
139
140 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
141 }
142
143 @Override
144 public void addNumericRangeTerm(
145 BooleanQuery booleanQuery, String field, Long startValue,
146 Long endValue) {
147
148 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
149 field, startValue, endValue, true, true);
150
151 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
152 }
153
154
158 @Override
159 public void addNumericRangeTerm(
160 BooleanQuery booleanQuery, String field, String startValue,
161 String endValue) {
162
163 addNumericRangeTerm(
164 booleanQuery, field, GetterUtil.getLong(startValue),
165 GetterUtil.getLong(endValue));
166 }
167
168 @Override
169 public void addRangeTerm(
170 BooleanQuery booleanQuery, String field, String startValue,
171 String endValue) {
172
173 boolean includesLower = true;
174
175 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
176 includesLower = false;
177 }
178
179 boolean includesUpper = true;
180
181 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
182 includesUpper = false;
183 }
184
185 TermRangeQuery termRangeQuery = new TermRangeQuery(
186 field, startValue, endValue, includesLower, includesUpper);
187
188 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
189 }
190
191 @Override
192 public void addRequiredTerm(
193 BooleanQuery booleanQuery, String field, String value, boolean like) {
194
195 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
196 }
197
198 @Override
199 public void addRequiredTerm(
200 BooleanQuery booleanQuery, String field, String[] values,
201 boolean like) {
202
203 if (values == null) {
204 return;
205 }
206
207 BooleanQuery query = new BooleanQuery();
208
209 for (String value : values) {
210 addTerm(query, field, value, like);
211 }
212
213 booleanQuery.add(query, BooleanClause.Occur.MUST);
214 }
215
216 @Override
217 public void addTerm(
218 BooleanQuery booleanQuery, String field, String value, boolean like) {
219
220 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
221 }
222
223 @Override
224 public void addTerm(
225 BooleanQuery booleanQuery, String field, String value, boolean like,
226 BooleanClauseOccur booleanClauseOccur) {
227
228 if (Validator.isNull(value)) {
229 return;
230 }
231
232 Analyzer analyzer = getAnalyzer();
233
234 if (!like) {
235 like = isLikeField(field);
236 }
237
238 if (like) {
239 value = StringUtil.replace(
240 value, StringPool.PERCENT, StringPool.BLANK);
241 }
242
243 try {
244 QueryParser queryParser = new QueryParser(
245 getVersion(), field, analyzer);
246
247 value = StringUtil.replace(
248 value, _KEYWORDS_LOWERCASE, _KEYWORDS_UPPERCASE);
249
250 Query query = parseQuery(queryParser, value);
251
252 BooleanClause.Occur occur = null;
253
254 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
255 occur = BooleanClause.Occur.MUST;
256 }
257 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
258 occur = BooleanClause.Occur.MUST_NOT;
259 }
260 else {
261 occur = BooleanClause.Occur.SHOULD;
262 }
263
264 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
265 }
266 catch (BooleanQuery.TooManyClauses tmc) {
267 _log.error(
268 "The value in the portal property \"" +
269 PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE +
270 "\" is too small",
271 tmc);
272 }
273 catch (Exception e) {
274 if (_log.isWarnEnabled()) {
275 _log.warn(e, e);
276 }
277 }
278 }
279
280 @Override
281 public void addTerm(
282 BooleanQuery booleanQuery, String field, String[] values,
283 boolean like) {
284
285 for (String value : values) {
286 addTerm(booleanQuery, field, value, like);
287 }
288 }
289
290
294 @Deprecated
295 @Override
296 public void cleanUp(IndexSearcher indexSearcher) {
297 if (indexSearcher == null) {
298 return;
299 }
300
301 try {
302 indexSearcher.close();
303
304 IndexReader indexReader = indexSearcher.getIndexReader();
305
306 if (indexReader != null) {
307 indexReader.close();
308 }
309 }
310 catch (IOException ioe) {
311 _log.error(ioe, ioe);
312 }
313 }
314
315 @Override
316 public int countScoredFieldNames(Query query, String[] filedNames) {
317 int count = 0;
318
319 for (String fieldName : filedNames) {
320 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
321 query, false, fieldName);
322
323 if ((weightedTerms.length > 0) &&
324 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
325
326 count++;
327 }
328 }
329
330 return count;
331 }
332
333 @Override
334 public void delete(long companyId) {
335 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
336
337 if (indexAccessor == null) {
338 return;
339 }
340
341 indexAccessor.delete();
342 }
343
344 @Override
345 public void deleteDocuments(long companyId, Term term) throws IOException {
346 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
347
348 if (indexAccessor == null) {
349 return;
350 }
351
352 indexAccessor.deleteDocuments(term);
353 }
354
355 @Override
356 public void dumpIndex(long companyId, OutputStream outputStream)
357 throws IOException {
358
359 long lastGeneration = getLastGeneration(companyId);
360
361 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
362 if (_log.isDebugEnabled()) {
363 _log.debug(
364 "Dump index from cluster is not enabled for " + companyId);
365 }
366
367 return;
368 }
369
370 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
371
372 if (indexAccessor == null) {
373 return;
374 }
375
376 indexAccessor.dumpIndex(outputStream);
377 }
378
379 @Override
380 public Analyzer getAnalyzer() {
381 if (!PropsValues.INDEX_PORTAL_FIELD_ANALYZER_ENABLED) {
382 return _keywordAnalyzer;
383 }
384
385 return _analyzer;
386 }
387
388 @Override
389 public IndexAccessor getIndexAccessor(long companyId) {
390 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
391
392 if (indexAccessor != null) {
393 return indexAccessor;
394 }
395
396 synchronized (this) {
397 indexAccessor = _indexAccessors.get(companyId);
398
399 if (indexAccessor == null) {
400 indexAccessor = new IndexAccessorImpl(companyId);
401
402 if (isLoadIndexFromClusterEnabled()) {
403 indexAccessor = new SynchronizedIndexAccessorImpl(
404 indexAccessor);
405
406 boolean clusterForwardMessage = GetterUtil.getBoolean(
407 MessageValuesThreadLocal.getValue(
408 ClusterLink.CLUSTER_FORWARD_MESSAGE));
409
410 if (clusterForwardMessage) {
411 if (_log.isInfoEnabled()) {
412 _log.info(
413 "Skip Luncene index files cluster loading " +
414 "since this is a manual reindex request");
415 }
416 }
417 else {
418 try {
419 _loadIndexFromCluster(
420 indexAccessor,
421 indexAccessor.getLastGeneration());
422 }
423 catch (Exception e) {
424 _log.error(
425 "Unable to load index for company " +
426 indexAccessor.getCompanyId(),
427 e);
428 }
429 }
430 }
431
432 _indexAccessors.put(companyId, indexAccessor);
433 }
434 }
435
436 return indexAccessor;
437 }
438
439 @Override
440 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
441 IndexAccessor indexAccessor = getIndexAccessor(companyId);
442
443 return indexAccessor.acquireIndexSearcher();
444 }
445
446 @Override
447 public long getLastGeneration(long companyId) {
448 if (!isLoadIndexFromClusterEnabled()) {
449 return IndexAccessor.DEFAULT_LAST_GENERATION;
450 }
451
452 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
453
454 if (indexAccessor == null) {
455 return IndexAccessor.DEFAULT_LAST_GENERATION;
456 }
457
458 return indexAccessor.getLastGeneration();
459 }
460
461 @Override
462 public InputStream getLoadIndexesInputStreamFromCluster(
463 long companyId, Address bootupAddress)
464 throws SystemException {
465
466 if (!isLoadIndexFromClusterEnabled()) {
467 return null;
468 }
469
470 InputStream inputStream = null;
471
472 try {
473 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
474 _getBootupClusterNodeObjectValuePair(bootupAddress);
475
476 URL url = bootupClusterNodeObjectValuePair.getValue();
477
478 URLConnection urlConnection = url.openConnection();
479
480 urlConnection.setDoOutput(true);
481
482 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
483 urlConnection.getOutputStream());
484
485 unsyncPrintWriter.write("transientToken=");
486 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
487 unsyncPrintWriter.write("&companyId=");
488 unsyncPrintWriter.write(String.valueOf(companyId));
489
490 unsyncPrintWriter.close();
491
492 inputStream = urlConnection.getInputStream();
493
494 return inputStream;
495 }
496 catch (IOException ioe) {
497 throw new SystemException(ioe);
498 }
499 }
500
501 @Override
502 public Set<String> getQueryTerms(Query query) {
503 String queryString = StringUtil.replace(
504 query.toString(), StringPool.STAR, StringPool.BLANK);
505
506 Query tempQuery = null;
507
508 try {
509 QueryParser queryParser = new QueryParser(
510 getVersion(), StringPool.BLANK, getAnalyzer());
511
512 tempQuery = parseQuery(queryParser, queryString);
513 }
514 catch (Exception e) {
515 if (_log.isWarnEnabled()) {
516 _log.warn("Unable to parse " + queryString);
517 }
518
519 tempQuery = query;
520 }
521
522 WeightedTerm[] weightedTerms = null;
523
524 for (String fieldName : Field.KEYWORDS) {
525 weightedTerms = QueryTermExtractor.getTerms(
526 tempQuery, false, fieldName);
527
528 if (weightedTerms.length > 0) {
529 break;
530 }
531 }
532
533 Set<String> queryTerms = new HashSet<String>();
534
535 for (WeightedTerm weightedTerm : weightedTerms) {
536 queryTerms.add(weightedTerm.getTerm());
537 }
538
539 return queryTerms;
540 }
541
542
545 @Deprecated
546 @Override
547 public IndexSearcher getSearcher(long companyId, boolean readOnly)
548 throws IOException {
549
550 IndexAccessor indexAccessor = getIndexAccessor(companyId);
551
552 IndexReader indexReader = IndexReader.open(
553 indexAccessor.getLuceneDir(), readOnly);
554
555 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
556
557 indexSearcher.setDefaultFieldSortScoring(true, false);
558 indexSearcher.setSimilarity(new FieldWeightSimilarity());
559
560 return indexSearcher;
561 }
562
563 @Override
564 public String getSnippet(
565 Query query, String field, String s, int maxNumFragments,
566 int fragmentLength, String fragmentSuffix, Formatter formatter)
567 throws IOException {
568
569 QueryScorer queryScorer = new QueryScorer(query, field);
570
571 Highlighter highlighter = new Highlighter(formatter, queryScorer);
572
573 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
574
575 TokenStream tokenStream = getAnalyzer().tokenStream(
576 field, new UnsyncStringReader(s));
577
578 try {
579 String snippet = highlighter.getBestFragments(
580 tokenStream, s, maxNumFragments, fragmentSuffix);
581
582 if (Validator.isNotNull(snippet) &&
583 !StringUtil.endsWith(snippet, fragmentSuffix) &&
584 !s.equals(snippet)) {
585
586 snippet = snippet.concat(fragmentSuffix);
587 }
588
589 return snippet;
590 }
591 catch (InvalidTokenOffsetsException itoe) {
592 throw new IOException(itoe.getMessage());
593 }
594 }
595
596 @Override
597 public Version getVersion() {
598 return _version;
599 }
600
601 @Override
602 public boolean isLoadIndexFromClusterEnabled() {
603 if (PropsValues.CLUSTER_LINK_ENABLED &&
604 PropsValues.LUCENE_REPLICATE_WRITE) {
605
606 return true;
607 }
608
609 if (_log.isDebugEnabled()) {
610 _log.debug("Load index from cluster is not enabled");
611 }
612
613 return false;
614 }
615
616 public boolean isLikeField(String field) {
617 if (_analyzer instanceof PerFieldAnalyzer) {
618 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)_analyzer;
619
620 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
621
622 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
623 return true;
624 }
625 }
626
627 return false;
628 }
629
630 @Override
631 public void loadIndex(long companyId, InputStream inputStream)
632 throws IOException {
633
634 if (!isLoadIndexFromClusterEnabled()) {
635 return;
636 }
637
638 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
639
640 if (indexAccessor == null) {
641 if (_log.isInfoEnabled()) {
642 _log.info(
643 "Skip loading Lucene index files for company " + companyId +
644 " in favor of lazy loading");
645 }
646
647 return;
648 }
649
650 StopWatch stopWatch = new StopWatch();
651
652 stopWatch.start();
653
654 if (_log.isInfoEnabled()) {
655 _log.info(
656 "Start loading Lucene index files for company " + companyId);
657 }
658
659 indexAccessor.loadIndex(inputStream);
660
661 if (_log.isInfoEnabled()) {
662 _log.info(
663 "Finished loading index files for company " + companyId +
664 " in " + stopWatch.getTime() + " ms");
665 }
666 }
667
668 @Override
669 public void loadIndexesFromCluster(long companyId) throws SystemException {
670 if (!isLoadIndexFromClusterEnabled()) {
671 return;
672 }
673
674 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
675
676 if (indexAccessor == null) {
677 return;
678 }
679
680 long localLastGeneration = getLastGeneration(companyId);
681
682 _loadIndexFromCluster(indexAccessor, localLastGeneration);
683 }
684
685 @Override
686 public void releaseIndexSearcher(
687 long companyId, IndexSearcher indexSearcher)
688 throws IOException {
689
690 IndexAccessor indexAccessor = getIndexAccessor(companyId);
691
692 indexAccessor.releaseIndexSearcher(indexSearcher);
693 }
694
695 public void setAnalyzer(Analyzer analyzer) {
696 _analyzer = analyzer;
697 }
698
699 public void setKeywordAnalyzer(Analyzer keywordAnalyzer) {
700 _keywordAnalyzer = keywordAnalyzer;
701 }
702
703 public void setVersion(Version version) {
704 _version = version;
705 }
706
707 @Override
708 public void shutdown() {
709 if (_luceneIndexThreadPoolExecutor != null) {
710 _luceneIndexThreadPoolExecutor.shutdownNow();
711
712 try {
713 _luceneIndexThreadPoolExecutor.awaitTermination(
714 60, TimeUnit.SECONDS);
715 }
716 catch (InterruptedException ie) {
717 _log.error("Lucene indexer shutdown interrupted", ie);
718 }
719 }
720
721 if (isLoadIndexFromClusterEnabled()) {
722 ClusterExecutorUtil.removeClusterEventListener(
723 _loadIndexClusterEventListener);
724 }
725
726 MessageBus messageBus = MessageBusUtil.getMessageBus();
727
728 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
729 String searchWriterDestinationName =
730 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
731
732 Destination searchWriteDestination = messageBus.getDestination(
733 searchWriterDestinationName);
734
735 if (searchWriteDestination != null) {
736 ThreadPoolExecutor threadPoolExecutor =
737 PortalExecutorManagerUtil.getPortalExecutor(
738 searchWriterDestinationName);
739
740 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
741
742 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
743
744 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
745 countDownLatch);
746
747 for (int i = 0; i < maxPoolSize; i++) {
748 threadPoolExecutor.submit(shutdownSyncJob);
749 }
750
751 try {
752 countDownLatch.await();
753 }
754 catch (InterruptedException ie) {
755 _log.error("Shutdown waiting interrupted", ie);
756 }
757
758 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
759
760 if (_log.isDebugEnabled()) {
761 _log.debug(
762 "Cancelled appending indexing jobs: " + runnables);
763 }
764
765 searchWriteDestination.close(true);
766 }
767 }
768
769 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
770 indexAccessor.close();
771 }
772 }
773
774 @Override
775 public void shutdown(long companyId) {
776 IndexAccessor indexAccessor = getIndexAccessor(companyId);
777
778 _indexAccessors.remove(indexAccessor);
779
780 indexAccessor.close();
781 }
782
783 @Override
784 public void startup(long companyId) {
785 if (!PropsValues.INDEX_ON_STARTUP) {
786 if (PropsValues.LUCENE_CLUSTER_INDEX_LOADING_ON_STARTUP) {
787 getIndexAccessor(companyId);
788 }
789
790 return;
791 }
792
793 if (_log.isInfoEnabled()) {
794 _log.info("Indexing Lucene on startup");
795 }
796
797 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
798
799 if (PropsValues.INDEX_WITH_THREAD) {
800 if (_luceneIndexThreadPoolExecutor == null) {
801
802
803
804
805
806 _luceneIndexThreadPoolExecutor =
807 PortalExecutorManagerUtil.getPortalExecutor(
808 LuceneHelperImpl.class.getName());
809 }
810
811 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
812 }
813 else {
814 luceneIndexer.reindex();
815 }
816 }
817
818 @Override
819 public void updateDocument(long companyId, Term term, Document document)
820 throws IOException {
821
822 IndexAccessor indexAccessor = getIndexAccessor(companyId);
823
824 indexAccessor.updateDocument(term, document);
825 }
826
827 protected Query parseQuery(QueryParser queryParser, String queryString)
828 throws ParseException {
829
830 try {
831 return queryParser.parse(queryString);
832 }
833 catch (ParseException e) {
834 return queryParser.parse(KeywordsUtil.escape(queryString));
835 }
836 }
837
838 private LuceneHelperImpl() {
839 if ((PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) ||
840 isLoadIndexFromClusterEnabled()) {
841
842 _luceneIndexThreadPoolExecutor =
843 PortalExecutorManagerUtil.getPortalExecutor(
844 LuceneHelperImpl.class.getName());
845 }
846
847 if (isLoadIndexFromClusterEnabled()) {
848 _loadIndexClusterEventListener =
849 new LoadIndexClusterEventListener();
850
851 ClusterExecutorUtil.addClusterEventListener(
852 _loadIndexClusterEventListener);
853
854 CompanyInitalizedListener companyInitalizedListener =
855 new CompanyInitalizedListener();
856
857 companyInitalizedListener.registerPortalLifecycle(
858 PortalLifecycle.METHOD_INIT);
859 }
860
861 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
862 }
863
864 private ObjectValuePair<String, URL>
865 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
866 throws SystemException {
867
868 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
869 new MethodHandler(
870 _createTokenMethodKey,
871 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
872 bootupAddress);
873
874 FutureClusterResponses futureClusterResponses =
875 ClusterExecutorUtil.execute(clusterRequest);
876
877 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
878 futureClusterResponses.getPartialResults();
879
880 try {
881 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
882 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
883 TimeUnit.MILLISECONDS);
884
885 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
886
887 int port = clusterNode.getPort();
888
889 if (port <= 0) {
890 StringBundler sb = new StringBundler(6);
891
892 sb.append("Invalid cluster node port ");
893 sb.append(port);
894 sb.append(". The port is set by the first request or ");
895 sb.append("configured in portal.properties by the properties ");
896 sb.append("\"portal.instance.http.port\" and ");
897 sb.append("\"portal.instance.https.port\".");
898
899 throw new Exception(sb.toString());
900 }
901
902 String protocol = clusterNode.getPortalProtocol();
903
904 if (Validator.isNull(protocol)) {
905 StringBundler sb = new StringBundler(4);
906
907 sb.append("Cluster node protocol is empty. The protocol is ");
908 sb.append("set by the first request or configured in ");
909 sb.append("portal.properties by the property ");
910 sb.append("\"portal.instance.protocol\"");
911
912 throw new Exception(sb.toString());
913 }
914
915 InetAddress inetAddress = clusterNode.getInetAddress();
916
917 String hostName = null;
918
919 if (PropsValues.LUCENE_CLUSTER_INDEX_USE_CANONICAL_HOST_NAME) {
920 hostName = inetAddress.getCanonicalHostName();
921 }
922 else {
923 hostName = inetAddress.getHostAddress();
924 }
925
926 String fileName = PortalUtil.getPathContext();
927
928 if (!fileName.endsWith(StringPool.SLASH)) {
929 fileName = fileName.concat(StringPool.SLASH);
930 }
931
932 fileName = fileName.concat("lucene/dump");
933
934 URL url = new URL(protocol, hostName, port, fileName);
935
936 String transientToken = (String)clusterNodeResponse.getResult();
937
938 return new ObjectValuePair<String, URL>(transientToken, url);
939 }
940 catch (Exception e) {
941 throw new SystemException(e);
942 }
943 }
944
945 private void _handleFutureClusterResponses(
946 FutureClusterResponses futureClusterResponses,
947 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
948 long localLastGeneration) {
949
950 BlockingQueue<ClusterNodeResponse> blockingQueue =
951 futureClusterResponses.getPartialResults();
952
953 long companyId = indexAccessor.getCompanyId();
954
955 Address bootupAddress = null;
956
957 do {
958 clusterNodeAddressesCount--;
959
960 ClusterNodeResponse clusterNodeResponse = null;
961
962 try {
963 clusterNodeResponse = blockingQueue.poll(
964 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
965 TimeUnit.MILLISECONDS);
966 }
967 catch (Exception e) {
968 _log.error("Unable to get cluster node response", e);
969 }
970
971 if (clusterNodeResponse == null) {
972 if (_log.isDebugEnabled()) {
973 _log.debug(
974 "Unable to get cluster node response in " +
975 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
976 TimeUnit.MILLISECONDS);
977 }
978
979 continue;
980 }
981
982 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
983
984 if (clusterNode.getPort() > 0) {
985 try {
986 long remoteLastGeneration =
987 (Long)clusterNodeResponse.getResult();
988
989 if (remoteLastGeneration > localLastGeneration) {
990 bootupAddress = clusterNodeResponse.getAddress();
991
992 break;
993 }
994 }
995 catch (Exception e) {
996 if (_log.isDebugEnabled()) {
997 _log.debug(
998 "Suppress exception caused by remote method " +
999 "invocation",
1000 e);
1001 }
1002
1003 continue;
1004 }
1005 }
1006 else {
1007 if (_log.isDebugEnabled()) {
1008 _log.debug(
1009 "Cluster node " + clusterNode +
1010 " has invalid port");
1011 }
1012 }
1013 }
1014 while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
1015
1016 if (bootupAddress == null) {
1017 return;
1018 }
1019
1020 if (_log.isInfoEnabled()) {
1021 _log.info(
1022 "Start loading lucene index files from cluster node " +
1023 bootupAddress);
1024 }
1025
1026 InputStream inputStream = null;
1027
1028 try {
1029 inputStream = getLoadIndexesInputStreamFromCluster(
1030 companyId, bootupAddress);
1031
1032 indexAccessor.loadIndex(inputStream);
1033
1034 if (_log.isInfoEnabled()) {
1035 _log.info("Lucene index files loaded successfully");
1036 }
1037 }
1038 catch (Exception e) {
1039 _log.error("Unable to load index for company " + companyId, e);
1040 }
1041 finally {
1042 if (inputStream != null) {
1043 try {
1044 inputStream.close();
1045 }
1046 catch (IOException ioe) {
1047 _log.error(
1048 "Unable to close input stream for company " +
1049 companyId,
1050 ioe);
1051 }
1052 }
1053 }
1054 }
1055
1056 private void _includeIfUnique(
1057 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
1058 Query query, BooleanClause.Occur occur) {
1059
1060 if (query instanceof TermQuery) {
1061 Set<Term> terms = new HashSet<Term>();
1062
1063 TermQuery termQuery = (TermQuery)query;
1064
1065 termQuery.extractTerms(terms);
1066
1067 float boost = termQuery.getBoost();
1068
1069 for (Term term : terms) {
1070 String termValue = term.text();
1071
1072 if (like) {
1073 termValue = termValue.toLowerCase(queryParser.getLocale());
1074
1075 termValue = termValue.concat(StringPool.STAR);
1076
1077 if (PropsValues.INDEX_SEARCH_LEADING_WILDCARD_ENABLED) {
1078 termValue = StringPool.STAR.concat(termValue);
1079 }
1080
1081 term = term.createTerm(termValue);
1082
1083 query = new WildcardQuery(term);
1084 }
1085 else {
1086 query = new TermQuery(term);
1087 }
1088
1089 query.setBoost(boost);
1090
1091 boolean included = false;
1092
1093 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1094 if (query.equals(booleanClause.getQuery())) {
1095 included = true;
1096 }
1097 }
1098
1099 if (!included) {
1100 booleanQuery.add(query, occur);
1101 }
1102 }
1103 }
1104 else if (query instanceof BooleanQuery) {
1105 BooleanQuery curBooleanQuery = (BooleanQuery)query;
1106
1107 BooleanQuery containerBooleanQuery = new BooleanQuery();
1108
1109 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1110 _includeIfUnique(
1111 containerBooleanQuery, like, queryParser,
1112 booleanClause.getQuery(), booleanClause.getOccur());
1113 }
1114
1115 if (containerBooleanQuery.getClauses().length > 0) {
1116 booleanQuery.add(containerBooleanQuery, occur);
1117 }
1118 }
1119 else {
1120 boolean included = false;
1121
1122 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1123 if (query.equals(booleanClause.getQuery())) {
1124 included = true;
1125 }
1126 }
1127
1128 if (!included) {
1129 booleanQuery.add(query, occur);
1130 }
1131 }
1132 }
1133
1134 private void _loadIndexFromCluster(
1135 IndexAccessor indexAccessor, long localLastGeneration)
1136 throws SystemException {
1137
1138 List<Address> clusterNodeAddresses =
1139 ClusterExecutorUtil.getClusterNodeAddresses();
1140
1141 int clusterNodeAddressesCount = clusterNodeAddresses.size();
1142
1143 if (clusterNodeAddressesCount <= 1) {
1144 if (_log.isDebugEnabled()) {
1145 _log.debug(
1146 "Do not load indexes because there is either one portal " +
1147 "instance or no portal instances in the cluster");
1148 }
1149
1150 return;
1151 }
1152
1153 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1154 new MethodHandler(
1155 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1156 true);
1157
1158 FutureClusterResponses futureClusterResponses =
1159 ClusterExecutorUtil.execute(clusterRequest);
1160
1161 _handleFutureClusterResponses(
1162 futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1163 localLastGeneration);
1164 }
1165
1166 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1167 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1168
1169 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1170 GetterUtil.getInteger(
1171 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1172 BooleanQuery.getMaxClauseCount());
1173
1174 private static final String[] _KEYWORDS_LOWERCASE = {
1175 " and ", " not ", " or "
1176 };
1177
1178 private static final String[] _KEYWORDS_UPPERCASE = {
1179 " AND ", " NOT ", " OR "
1180 };
1181
1182 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1183
1184 private static MethodKey _createTokenMethodKey = new MethodKey(
1185 TransientTokenUtil.class, "createToken", long.class);
1186 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1187 LuceneHelperUtil.class, "getLastGeneration", long.class);
1188
1189 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
1190
1191 private Analyzer _analyzer;
1192 private Map<Long, IndexAccessor> _indexAccessors =
1193 new ConcurrentHashMap<Long, IndexAccessor>();
1194 private Analyzer _keywordAnalyzer;
1195 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1196 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1197 private Version _version;
1198
1199 private static class ShutdownSyncJob implements Runnable {
1200
1201 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1202 _countDownLatch = countDownLatch;
1203 }
1204
1205 @Override
1206 public void run() {
1207 _countDownLatch.countDown();
1208
1209 try {
1210 synchronized (this) {
1211 wait();
1212 }
1213 }
1214 catch (InterruptedException ie) {
1215 }
1216 }
1217
1218 private final CountDownLatch _countDownLatch;
1219
1220 }
1221
1222 private class ClusterIndexLoader implements Runnable {
1223
1224 @Override
1225 public void run() {
1226 long lastGeneration = getLastGeneration(_companyId);
1227
1228 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1229 return;
1230 }
1231
1232 try {
1233 LuceneClusterUtil.loadIndexesFromCluster(_companyId);
1234 }
1235 catch (Exception e) {
1236 _log.error(
1237 "Unable to load indexes for company " + _companyId, e);
1238 }
1239 }
1240
1241 private ClusterIndexLoader(long companyId) {
1242 _companyId = companyId;
1243 }
1244
1245 private final long _companyId;
1246
1247 }
1248
1249 private class CompanyInitalizedListener extends BasePortalLifecycle {
1250
1251 @Override
1252 protected void doPortalDestroy() throws Exception {
1253 }
1254
1255 @Override
1256 protected void doPortalInit() throws Exception {
1257 _countDownLatch.countDown();
1258 }
1259
1260 }
1261
1262 private class LoadIndexClusterEventListener
1263 implements ClusterEventListener {
1264
1265 @Override
1266 public void processClusterEvent(ClusterEvent clusterEvent) {
1267 ClusterEventType clusterEventType =
1268 clusterEvent.getClusterEventType();
1269
1270 if (clusterEventType != ClusterEventType.JOIN) {
1271 return;
1272 }
1273
1274 List<Address> clusterNodeAddresses =
1275 ClusterExecutorUtil.getClusterNodeAddresses();
1276 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1277
1278 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1279 if (_log.isDebugEnabled()) {
1280 _log.debug(
1281 "Number of original cluster members is greater than " +
1282 "one");
1283 }
1284
1285 return;
1286 }
1287
1288 try {
1289 _countDownLatch.await();
1290 }
1291 catch (InterruptedException ie) {
1292 _log.error("Latch opened prematurely by interruption");
1293 }
1294
1295 long[] companyIds = PortalInstances.getCompanyIds();
1296
1297 for (long companyId : companyIds) {
1298 _luceneIndexThreadPoolExecutor.execute(
1299 new ClusterIndexLoader(companyId));
1300 }
1301
1302 _luceneIndexThreadPoolExecutor.execute(
1303 new ClusterIndexLoader(CompanyConstants.SYSTEM));
1304 }
1305
1306 }
1307
1308 }