001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLink;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.Destination;
036 import com.liferay.portal.kernel.messaging.MessageBus;
037 import com.liferay.portal.kernel.messaging.MessageBusUtil;
038 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039 import com.liferay.portal.kernel.search.BooleanClauseOccur;
040 import com.liferay.portal.kernel.search.Field;
041 import com.liferay.portal.kernel.search.SearchEngineUtil;
042 import com.liferay.portal.kernel.util.ArrayUtil;
043 import com.liferay.portal.kernel.util.GetterUtil;
044 import com.liferay.portal.kernel.util.MethodHandler;
045 import com.liferay.portal.kernel.util.MethodKey;
046 import com.liferay.portal.kernel.util.ObjectValuePair;
047 import com.liferay.portal.kernel.util.PropsKeys;
048 import com.liferay.portal.kernel.util.StringPool;
049 import com.liferay.portal.kernel.util.StringUtil;
050 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
051 import com.liferay.portal.kernel.util.Validator;
052 import com.liferay.portal.model.CompanyConstants;
053 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
054 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
055 import com.liferay.portal.security.auth.TransientTokenUtil;
056 import com.liferay.portal.util.PortalInstances;
057 import com.liferay.portal.util.PortalUtil;
058 import com.liferay.portal.util.PropsUtil;
059 import com.liferay.portal.util.PropsValues;
060
061 import java.io.IOException;
062 import java.io.InputStream;
063 import java.io.OutputStream;
064
065 import java.net.InetAddress;
066 import java.net.URL;
067 import java.net.URLConnection;
068
069 import java.util.HashSet;
070 import java.util.List;
071 import java.util.Map;
072 import java.util.Set;
073 import java.util.concurrent.BlockingQueue;
074 import java.util.concurrent.ConcurrentHashMap;
075 import java.util.concurrent.TimeUnit;
076 import java.util.concurrent.TimeoutException;
077
078 import org.apache.commons.lang.time.StopWatch;
079 import org.apache.lucene.analysis.Analyzer;
080 import org.apache.lucene.analysis.TokenStream;
081 import org.apache.lucene.document.Document;
082 import org.apache.lucene.index.IndexReader;
083 import org.apache.lucene.index.Term;
084 import org.apache.lucene.queryParser.QueryParser;
085 import org.apache.lucene.search.BooleanClause;
086 import org.apache.lucene.search.BooleanQuery;
087 import org.apache.lucene.search.IndexSearcher;
088 import org.apache.lucene.search.NumericRangeQuery;
089 import org.apache.lucene.search.Query;
090 import org.apache.lucene.search.TermQuery;
091 import org.apache.lucene.search.TermRangeQuery;
092 import org.apache.lucene.search.WildcardQuery;
093 import org.apache.lucene.search.highlight.Formatter;
094 import org.apache.lucene.search.highlight.Highlighter;
095 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
096 import org.apache.lucene.search.highlight.QueryScorer;
097 import org.apache.lucene.search.highlight.SimpleFragmenter;
098 import org.apache.lucene.search.highlight.WeightedTerm;
099 import org.apache.lucene.util.Version;
100
101
110 public class LuceneHelperImpl implements LuceneHelper {
111
112 @Override
113 public void addDocument(long companyId, Document document)
114 throws IOException {
115
116 IndexAccessor indexAccessor = getIndexAccessor(companyId);
117
118 indexAccessor.addDocument(document);
119 }
120
121 @Override
122 public void addExactTerm(
123 BooleanQuery booleanQuery, String field, String value) {
124
125 addTerm(booleanQuery, field, value, false);
126 }
127
128 @Override
129 public void addNumericRangeTerm(
130 BooleanQuery booleanQuery, String field, Integer startValue,
131 Integer endValue) {
132
133 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
134 field, startValue, endValue, true, true);
135
136 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
137 }
138
139 @Override
140 public void addNumericRangeTerm(
141 BooleanQuery booleanQuery, String field, Long startValue,
142 Long endValue) {
143
144 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
145 field, startValue, endValue, true, true);
146
147 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
148 }
149
150
154 @Override
155 public void addNumericRangeTerm(
156 BooleanQuery booleanQuery, String field, String startValue,
157 String endValue) {
158
159 addNumericRangeTerm(
160 booleanQuery, field, GetterUtil.getLong(startValue),
161 GetterUtil.getLong(endValue));
162 }
163
164 @Override
165 public void addRangeTerm(
166 BooleanQuery booleanQuery, String field, String startValue,
167 String endValue) {
168
169 boolean includesLower = true;
170
171 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
172 includesLower = false;
173 }
174
175 boolean includesUpper = true;
176
177 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
178 includesUpper = false;
179 }
180
181 TermRangeQuery termRangeQuery = new TermRangeQuery(
182 field, startValue, endValue, includesLower, includesUpper);
183
184 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
185 }
186
187 @Override
188 public void addRequiredTerm(
189 BooleanQuery booleanQuery, String field, String value, boolean like) {
190
191 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
192 }
193
194 @Override
195 public void addRequiredTerm(
196 BooleanQuery booleanQuery, String field, String[] values,
197 boolean like) {
198
199 if (values == null) {
200 return;
201 }
202
203 BooleanQuery query = new BooleanQuery();
204
205 for (String value : values) {
206 addTerm(query, field, value, like);
207 }
208
209 booleanQuery.add(query, BooleanClause.Occur.MUST);
210 }
211
212 @Override
213 public void addTerm(
214 BooleanQuery booleanQuery, String field, String value, boolean like) {
215
216 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
217 }
218
219 @Override
220 public void addTerm(
221 BooleanQuery booleanQuery, String field, String value, boolean like,
222 BooleanClauseOccur booleanClauseOccur) {
223
224 if (Validator.isNull(value)) {
225 return;
226 }
227
228 Analyzer analyzer = getAnalyzer();
229
230 if (analyzer instanceof PerFieldAnalyzer) {
231 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
232
233 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
234
235 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
236 like = true;
237 }
238 }
239
240 if (like) {
241 value = StringUtil.replace(
242 value, StringPool.PERCENT, StringPool.BLANK);
243 }
244
245 try {
246 QueryParser queryParser = new QueryParser(
247 getVersion(), field, analyzer);
248
249 Query query = queryParser.parse(value);
250
251 BooleanClause.Occur occur = null;
252
253 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
254 occur = BooleanClause.Occur.MUST;
255 }
256 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
257 occur = BooleanClause.Occur.MUST_NOT;
258 }
259 else {
260 occur = BooleanClause.Occur.SHOULD;
261 }
262
263 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
264 }
265 catch (Exception e) {
266 if (_log.isWarnEnabled()) {
267 _log.warn(e, e);
268 }
269 }
270 }
271
272 @Override
273 public void addTerm(
274 BooleanQuery booleanQuery, String field, String[] values,
275 boolean like) {
276
277 for (String value : values) {
278 addTerm(booleanQuery, field, value, like);
279 }
280 }
281
282 @Override
283 public void cleanUp(IndexSearcher indexSearcher) {
284 if (indexSearcher == null) {
285 return;
286 }
287
288 try {
289 indexSearcher.close();
290
291 IndexReader indexReader = indexSearcher.getIndexReader();
292
293 if (indexReader != null) {
294 indexReader.close();
295 }
296 }
297 catch (IOException ioe) {
298 _log.error(ioe, ioe);
299 }
300 }
301
302 @Override
303 public int countScoredFieldNames(Query query, String[] filedNames) {
304 int count = 0;
305
306 for (String fieldName : filedNames) {
307 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
308 query, false, fieldName);
309
310 if ((weightedTerms.length > 0) &&
311 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
312
313 count++;
314 }
315 }
316
317 return count;
318 }
319
320 @Override
321 public void delete(long companyId) {
322 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
323
324 if (indexAccessor == null) {
325 return;
326 }
327
328 indexAccessor.delete();
329 }
330
331 @Override
332 public void deleteDocuments(long companyId, Term term) throws IOException {
333 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
334
335 if (indexAccessor == null) {
336 return;
337 }
338
339 indexAccessor.deleteDocuments(term);
340 }
341
342 @Override
343 public void dumpIndex(long companyId, OutputStream outputStream)
344 throws IOException {
345
346 long lastGeneration = getLastGeneration(companyId);
347
348 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
349 if (_log.isDebugEnabled()) {
350 _log.debug(
351 "Dump index from cluster is not enabled for " + companyId);
352 }
353
354 return;
355 }
356
357 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
358
359 if (indexAccessor == null) {
360 return;
361 }
362
363 indexAccessor.dumpIndex(outputStream);
364 }
365
366 @Override
367 public Analyzer getAnalyzer() {
368 return _analyzer;
369 }
370
371 @Override
372 public IndexAccessor getIndexAccessor(long companyId) {
373 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
374
375 if (indexAccessor != null) {
376 return indexAccessor;
377 }
378
379 synchronized (this) {
380 indexAccessor = _indexAccessors.get(companyId);
381
382 if (indexAccessor == null) {
383 indexAccessor = new IndexAccessorImpl(companyId);
384
385 if (isLoadIndexFromClusterEnabled()) {
386 indexAccessor = new SynchronizedIndexAccessorImpl(
387 indexAccessor);
388
389 boolean clusterForwardMessage = GetterUtil.getBoolean(
390 MessageValuesThreadLocal.getValue(
391 ClusterLink.CLUSTER_FORWARD_MESSAGE));
392
393 if (clusterForwardMessage) {
394 if (_log.isInfoEnabled()) {
395 _log.info(
396 "Skip Luncene index files cluster loading " +
397 "since this is a manual reindex request");
398 }
399 }
400 else {
401 try {
402 _loadIndexFromCluster(
403 indexAccessor,
404 indexAccessor.getLastGeneration());
405 }
406 catch (Exception e) {
407 _log.error(
408 "Unable to load index for company " +
409 indexAccessor.getCompanyId(),
410 e);
411 }
412 }
413 }
414
415 _indexAccessors.put(companyId, indexAccessor);
416 }
417 }
418
419 return indexAccessor;
420 }
421
422 @Override
423 public long getLastGeneration(long companyId) {
424 if (!isLoadIndexFromClusterEnabled()) {
425 return IndexAccessor.DEFAULT_LAST_GENERATION;
426 }
427
428 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
429
430 if (indexAccessor == null) {
431 return IndexAccessor.DEFAULT_LAST_GENERATION;
432 }
433
434 return indexAccessor.getLastGeneration();
435 }
436
437 @Override
438 public InputStream getLoadIndexesInputStreamFromCluster(
439 long companyId, Address bootupAddress)
440 throws SystemException {
441
442 if (!isLoadIndexFromClusterEnabled()) {
443 return null;
444 }
445
446 InputStream inputStream = null;
447
448 try {
449 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
450 _getBootupClusterNodeObjectValuePair(bootupAddress);
451
452 URL url = bootupClusterNodeObjectValuePair.getValue();
453
454 URLConnection urlConnection = url.openConnection();
455
456 urlConnection.setDoOutput(true);
457
458 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
459 urlConnection.getOutputStream());
460
461 unsyncPrintWriter.write("transientToken=");
462 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
463 unsyncPrintWriter.write("&companyId=");
464 unsyncPrintWriter.write(String.valueOf(companyId));
465
466 unsyncPrintWriter.close();
467
468 inputStream = urlConnection.getInputStream();
469
470 return inputStream;
471 }
472 catch (IOException ioe) {
473 throw new SystemException(ioe);
474 }
475 }
476
477 @Override
478 public Set<String> getQueryTerms(Query query) {
479 String queryString = StringUtil.replace(
480 query.toString(), StringPool.STAR, StringPool.BLANK);
481
482 Query tempQuery = null;
483
484 try {
485 QueryParser queryParser = new QueryParser(
486 getVersion(), StringPool.BLANK, getAnalyzer());
487
488 tempQuery = queryParser.parse(queryString);
489 }
490 catch (Exception e) {
491 if (_log.isWarnEnabled()) {
492 _log.warn("Unable to parse " + queryString);
493 }
494
495 tempQuery = query;
496 }
497
498 WeightedTerm[] weightedTerms = null;
499
500 for (String fieldName : Field.KEYWORDS) {
501 weightedTerms = QueryTermExtractor.getTerms(
502 tempQuery, false, fieldName);
503
504 if (weightedTerms.length > 0) {
505 break;
506 }
507 }
508
509 Set<String> queryTerms = new HashSet<String>();
510
511 for (WeightedTerm weightedTerm : weightedTerms) {
512 queryTerms.add(weightedTerm.getTerm());
513 }
514
515 return queryTerms;
516 }
517
518 @Override
519 public IndexSearcher getSearcher(long companyId, boolean readOnly)
520 throws IOException {
521
522 IndexAccessor indexAccessor = getIndexAccessor(companyId);
523
524 IndexReader indexReader = IndexReader.open(
525 indexAccessor.getLuceneDir(), readOnly);
526
527 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
528
529 indexSearcher.setDefaultFieldSortScoring(true, true);
530 indexSearcher.setSimilarity(new FieldWeightSimilarity());
531
532 return indexSearcher;
533 }
534
535 @Override
536 public String getSnippet(
537 Query query, String field, String s, int maxNumFragments,
538 int fragmentLength, String fragmentSuffix, Formatter formatter)
539 throws IOException {
540
541 QueryScorer queryScorer = new QueryScorer(query, field);
542
543 Highlighter highlighter = new Highlighter(formatter, queryScorer);
544
545 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
546
547 TokenStream tokenStream = getAnalyzer().tokenStream(
548 field, new UnsyncStringReader(s));
549
550 try {
551 String snippet = highlighter.getBestFragments(
552 tokenStream, s, maxNumFragments, fragmentSuffix);
553
554 if (Validator.isNotNull(snippet) &&
555 !StringUtil.endsWith(snippet, fragmentSuffix) &&
556 !s.equals(snippet)) {
557
558 snippet = snippet.concat(fragmentSuffix);
559 }
560
561 return snippet;
562 }
563 catch (InvalidTokenOffsetsException itoe) {
564 throw new IOException(itoe.getMessage());
565 }
566 }
567
568 @Override
569 public Version getVersion() {
570 return _version;
571 }
572
573 @Override
574 public boolean isLoadIndexFromClusterEnabled() {
575 if (PropsValues.CLUSTER_LINK_ENABLED &&
576 PropsValues.LUCENE_REPLICATE_WRITE) {
577
578 return true;
579 }
580
581 if (_log.isDebugEnabled()) {
582 _log.debug("Load index from cluster is not enabled");
583 }
584
585 return false;
586 }
587
588 @Override
589 public void loadIndex(long companyId, InputStream inputStream)
590 throws IOException {
591
592 if (!isLoadIndexFromClusterEnabled()) {
593 return;
594 }
595
596 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
597
598 if (indexAccessor == null) {
599 if (_log.isInfoEnabled()) {
600 _log.info(
601 "Skip loading Lucene index files for company " + companyId +
602 " in favor of lazy loading");
603 }
604
605 return;
606 }
607
608 StopWatch stopWatch = null;
609
610 if (_log.isInfoEnabled()) {
611 _log.info(
612 "Start loading Lucene index files for company " + companyId);
613
614 stopWatch = new StopWatch();
615
616 stopWatch.start();
617 }
618
619 indexAccessor.loadIndex(inputStream);
620
621 if (_log.isInfoEnabled()) {
622 _log.info(
623 "Finished loading index files for company " + companyId +
624 " in " + stopWatch.getTime() + " ms");
625 }
626 }
627
628 @Override
629 public void loadIndexesFromCluster(long companyId) throws SystemException {
630 if (!isLoadIndexFromClusterEnabled()) {
631 return;
632 }
633
634 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
635
636 if (indexAccessor == null) {
637 return;
638 }
639
640 long localLastGeneration = getLastGeneration(companyId);
641
642 _loadIndexFromCluster(indexAccessor, localLastGeneration);
643 }
644
645 public void setAnalyzer(Analyzer analyzer) {
646 _analyzer = analyzer;
647 }
648
649 public void setVersion(Version version) {
650 _version = version;
651 }
652
653 @Override
654 public void shutdown() {
655 if (_luceneIndexThreadPoolExecutor != null) {
656 _luceneIndexThreadPoolExecutor.shutdownNow();
657
658 try {
659 _luceneIndexThreadPoolExecutor.awaitTermination(
660 60, TimeUnit.SECONDS);
661 }
662 catch (InterruptedException ie) {
663 _log.error("Lucene indexer shutdown interrupted", ie);
664 }
665 }
666
667 if (isLoadIndexFromClusterEnabled()) {
668 ClusterExecutorUtil.removeClusterEventListener(
669 _loadIndexClusterEventListener);
670 }
671
672 MessageBus messageBus = MessageBusUtil.getMessageBus();
673
674 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
675 String searchReaderDestinationName =
676 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
677
678 Destination searchReaderDestination = messageBus.getDestination(
679 searchReaderDestinationName);
680
681 if (searchReaderDestination != null) {
682 searchReaderDestination.close(true);
683 }
684 }
685
686 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
687 indexAccessor.close();
688 }
689 }
690
691 @Override
692 public void shutdown(long companyId) {
693 IndexAccessor indexAccessor = getIndexAccessor(companyId);
694
695 _indexAccessors.remove(indexAccessor);
696
697 indexAccessor.close();
698 }
699
700 @Override
701 public void startup(long companyId) {
702 if (!PropsValues.INDEX_ON_STARTUP) {
703 return;
704 }
705
706 if (_log.isInfoEnabled()) {
707 _log.info("Indexing Lucene on startup");
708 }
709
710 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
711
712 if (PropsValues.INDEX_WITH_THREAD) {
713 if (_luceneIndexThreadPoolExecutor == null) {
714
715
716
717
718
719 _luceneIndexThreadPoolExecutor =
720 PortalExecutorManagerUtil.getPortalExecutor(
721 LuceneHelperImpl.class.getName());
722 }
723
724 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
725 }
726 else {
727 luceneIndexer.reindex();
728 }
729 }
730
731 @Override
732 public void updateDocument(long companyId, Term term, Document document)
733 throws IOException {
734
735 IndexAccessor indexAccessor = getIndexAccessor(companyId);
736
737 indexAccessor.updateDocument(term, document);
738 }
739
740 private LuceneHelperImpl() {
741 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
742 _luceneIndexThreadPoolExecutor =
743 PortalExecutorManagerUtil.getPortalExecutor(
744 LuceneHelperImpl.class.getName());
745 }
746
747 if (isLoadIndexFromClusterEnabled()) {
748 _loadIndexClusterEventListener =
749 new LoadIndexClusterEventListener();
750
751 ClusterExecutorUtil.addClusterEventListener(
752 _loadIndexClusterEventListener);
753 }
754
755 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
756 }
757
758 private ObjectValuePair<String, URL>
759 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
760 throws SystemException {
761
762 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
763 new MethodHandler(
764 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
765 bootupAddress);
766
767 FutureClusterResponses futureClusterResponses =
768 ClusterExecutorUtil.execute(clusterRequest);
769
770 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
771 futureClusterResponses.getPartialResults();
772
773 try {
774 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
775 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
776 TimeUnit.MILLISECONDS);
777
778 String transientToken = (String)clusterNodeResponse.getResult();
779
780 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
781
782 InetAddress inetAddress = clusterNode.getInetAddress();
783
784 String fileName = PortalUtil.getPathContext();
785
786 if (!fileName.endsWith(StringPool.SLASH)) {
787 fileName = fileName.concat(StringPool.SLASH);
788 }
789
790 fileName = fileName.concat("lucene/dump");
791
792 URL url = new URL(
793 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
794 fileName);
795
796 return new ObjectValuePair<String, URL>(transientToken, url);
797 }
798 catch (Exception e) {
799 throw new SystemException(e);
800 }
801 }
802
803 private void _includeIfUnique(
804 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
805 Query query, BooleanClause.Occur occur) {
806
807 if (query instanceof TermQuery) {
808 Set<Term> terms = new HashSet<Term>();
809
810 TermQuery termQuery = (TermQuery)query;
811
812 termQuery.extractTerms(terms);
813
814 float boost = termQuery.getBoost();
815
816 for (Term term : terms) {
817 String termValue = term.text();
818
819 if (like) {
820 termValue = termValue.toLowerCase(queryParser.getLocale());
821
822 term = term.createTerm(
823 StringPool.STAR.concat(termValue).concat(
824 StringPool.STAR));
825
826 query = new WildcardQuery(term);
827 }
828 else {
829 query = new TermQuery(term);
830 }
831
832 query.setBoost(boost);
833
834 boolean included = false;
835
836 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
837 if (query.equals(booleanClause.getQuery())) {
838 included = true;
839 }
840 }
841
842 if (!included) {
843 booleanQuery.add(query, occur);
844 }
845 }
846 }
847 else if (query instanceof BooleanQuery) {
848 BooleanQuery curBooleanQuery = (BooleanQuery)query;
849
850 BooleanQuery containerBooleanQuery = new BooleanQuery();
851
852 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
853 _includeIfUnique(
854 containerBooleanQuery, like, queryParser,
855 booleanClause.getQuery(), booleanClause.getOccur());
856 }
857
858 if (containerBooleanQuery.getClauses().length > 0) {
859 booleanQuery.add(containerBooleanQuery, occur);
860 }
861 }
862 else {
863 boolean included = false;
864
865 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
866 if (query.equals(booleanClause.getQuery())) {
867 included = true;
868 }
869 }
870
871 if (!included) {
872 booleanQuery.add(query, occur);
873 }
874 }
875 }
876
877 private void _loadIndexFromCluster(
878 IndexAccessor indexAccessor, long localLastGeneration)
879 throws SystemException {
880
881 List<Address> clusterNodeAddresses =
882 ClusterExecutorUtil.getClusterNodeAddresses();
883
884 int clusterNodeAddressesCount = clusterNodeAddresses.size();
885
886 if (clusterNodeAddressesCount <= 1) {
887 if (_log.isDebugEnabled()) {
888 _log.debug(
889 "Do not load indexes because there is either one portal " +
890 "instance or no portal instances in the cluster");
891 }
892
893 return;
894 }
895
896 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
897 new MethodHandler(
898 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
899 true);
900
901 ClusterExecutorUtil.execute(
902 clusterRequest,
903 new LoadIndexClusterResponseCallback(
904 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
905 }
906
907 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
908 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
909
910 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
911 GetterUtil.getInteger(
912 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
913 BooleanQuery.getMaxClauseCount());
914
915 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
916
917 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
918
919 private static MethodKey _createTokenMethodKey = new MethodKey(
920 TransientTokenUtil.class, "createToken", long.class);
921 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
922 LuceneHelperUtil.class, "getLastGeneration", long.class);
923
924 private Analyzer _analyzer;
925 private Map<Long, IndexAccessor> _indexAccessors =
926 new ConcurrentHashMap<Long, IndexAccessor>();
927 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
928 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
929 private Version _version;
930
931 private class LoadIndexClusterEventListener
932 implements ClusterEventListener {
933
934 @Override
935 public void processClusterEvent(ClusterEvent clusterEvent) {
936 ClusterEventType clusterEventType =
937 clusterEvent.getClusterEventType();
938
939 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
940 return;
941 }
942
943 List<Address> clusterNodeAddresses =
944 ClusterExecutorUtil.getClusterNodeAddresses();
945 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
946
947 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
948 if (_log.isDebugEnabled()) {
949 _log.debug(
950 "Number of original cluster members is greater than " +
951 "one");
952 }
953
954 return;
955 }
956
957 long[] companyIds = PortalInstances.getCompanyIds();
958
959 for (long companyId : companyIds) {
960 loadIndexes(companyId);
961 }
962
963 loadIndexes(CompanyConstants.SYSTEM);
964 }
965
966 private void loadIndexes(long companyId) {
967 long lastGeneration = getLastGeneration(companyId);
968
969 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
970 return;
971 }
972
973 try {
974 LuceneClusterUtil.loadIndexesFromCluster(companyId);
975 }
976 catch (Exception e) {
977 _log.error(
978 "Unable to load indexes for company " + companyId, e);
979 }
980 }
981
982 }
983
984 private class LoadIndexClusterResponseCallback
985 extends BaseClusterResponseCallback {
986
987 public LoadIndexClusterResponseCallback(
988 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
989 long localLastGeneration) {
990
991 _indexAccessor = indexAccessor;
992 _clusterNodeAddressesCount = clusterNodeAddressesCount;
993 _localLastGeneration = localLastGeneration;
994
995 _companyId = _indexAccessor.getCompanyId();
996 }
997
998 @Override
999 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1000 Address bootupAddress = null;
1001
1002 do {
1003 _clusterNodeAddressesCount--;
1004
1005 ClusterNodeResponse clusterNodeResponse = null;
1006
1007 try {
1008 clusterNodeResponse = blockingQueue.poll(
1009 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1010 TimeUnit.MILLISECONDS);
1011 }
1012 catch (Exception e) {
1013 _log.error("Unable to get cluster node response", e);
1014 }
1015
1016 if (clusterNodeResponse == null) {
1017 if (_log.isDebugEnabled()) {
1018 _log.debug(
1019 "Unable to get cluster node response in " +
1020 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1021 TimeUnit.MILLISECONDS);
1022 }
1023
1024 continue;
1025 }
1026
1027 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1028
1029 if (clusterNode.getPort() > 0) {
1030 try {
1031 long remoteLastGeneration =
1032 (Long)clusterNodeResponse.getResult();
1033
1034 if (remoteLastGeneration > _localLastGeneration) {
1035 bootupAddress = clusterNodeResponse.getAddress();
1036
1037 break;
1038 }
1039 }
1040 catch (Exception e) {
1041 if (_log.isDebugEnabled()) {
1042 _log.debug(
1043 "Suppress exception caused by remote method " +
1044 "invocation",
1045 e);
1046 }
1047
1048 continue;
1049 }
1050 }
1051 else {
1052 if (_log.isDebugEnabled()) {
1053 _log.debug(
1054 "Cluster node " + clusterNode +
1055 " has invalid port");
1056 }
1057 }
1058 }
1059 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1060
1061 if (bootupAddress == null) {
1062 return;
1063 }
1064
1065 if (_log.isInfoEnabled()) {
1066 _log.info(
1067 "Start loading lucene index files from cluster node " +
1068 bootupAddress);
1069 }
1070
1071 InputStream inputStream = null;
1072
1073 try {
1074 inputStream = getLoadIndexesInputStreamFromCluster(
1075 _companyId, bootupAddress);
1076
1077 _indexAccessor.loadIndex(inputStream);
1078
1079 if (_log.isInfoEnabled()) {
1080 _log.info("Lucene index files loaded successfully");
1081 }
1082 }
1083 catch (Exception e) {
1084 _log.error("Unable to load index for company " + _companyId, e);
1085 }
1086 finally {
1087 if (inputStream != null) {
1088 try {
1089 inputStream.close();
1090 }
1091 catch (IOException ioe) {
1092 _log.error(
1093 "Unable to close input stream for company " +
1094 _companyId,
1095 ioe);
1096 }
1097 }
1098 }
1099 }
1100
1101 @Override
1102 public void processTimeoutException(TimeoutException timeoutException) {
1103 _log.error(
1104 "Unable to load index for company " + _companyId,
1105 timeoutException);
1106 }
1107
1108 private int _clusterNodeAddressesCount;
1109 private long _companyId;
1110 private IndexAccessor _indexAccessor;
1111 private long _localLastGeneration;
1112
1113 }
1114
1115 }