001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLink;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036 import com.liferay.portal.kernel.search.BooleanClauseOccur;
037 import com.liferay.portal.kernel.search.Field;
038 import com.liferay.portal.kernel.util.ArrayUtil;
039 import com.liferay.portal.kernel.util.GetterUtil;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.PropsKeys;
044 import com.liferay.portal.kernel.util.StringPool;
045 import com.liferay.portal.kernel.util.StringUtil;
046 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
047 import com.liferay.portal.kernel.util.Validator;
048 import com.liferay.portal.model.CompanyConstants;
049 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
050 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
051 import com.liferay.portal.security.auth.TransientTokenUtil;
052 import com.liferay.portal.util.PortalInstances;
053 import com.liferay.portal.util.PortalUtil;
054 import com.liferay.portal.util.PropsUtil;
055 import com.liferay.portal.util.PropsValues;
056 import com.liferay.util.lucene.KeywordsUtil;
057
058 import java.io.IOException;
059 import java.io.InputStream;
060 import java.io.OutputStream;
061
062 import java.net.InetAddress;
063 import java.net.URL;
064 import java.net.URLConnection;
065
066 import java.util.HashSet;
067 import java.util.List;
068 import java.util.Map;
069 import java.util.Set;
070 import java.util.concurrent.BlockingQueue;
071 import java.util.concurrent.ConcurrentHashMap;
072 import java.util.concurrent.TimeUnit;
073 import java.util.concurrent.TimeoutException;
074
075 import org.apache.commons.lang.time.StopWatch;
076 import org.apache.lucene.analysis.Analyzer;
077 import org.apache.lucene.analysis.TokenStream;
078 import org.apache.lucene.document.Document;
079 import org.apache.lucene.index.IndexReader;
080 import org.apache.lucene.index.Term;
081 import org.apache.lucene.queryParser.QueryParser;
082 import org.apache.lucene.search.BooleanClause;
083 import org.apache.lucene.search.BooleanQuery;
084 import org.apache.lucene.search.IndexSearcher;
085 import org.apache.lucene.search.NumericRangeQuery;
086 import org.apache.lucene.search.Query;
087 import org.apache.lucene.search.TermQuery;
088 import org.apache.lucene.search.TermRangeQuery;
089 import org.apache.lucene.search.WildcardQuery;
090 import org.apache.lucene.search.highlight.Formatter;
091 import org.apache.lucene.search.highlight.Highlighter;
092 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
093 import org.apache.lucene.search.highlight.QueryScorer;
094 import org.apache.lucene.search.highlight.SimpleFragmenter;
095 import org.apache.lucene.search.highlight.WeightedTerm;
096 import org.apache.lucene.util.Version;
097
098
106 public class LuceneHelperImpl implements LuceneHelper {
107
108 @Override
109 public void addDocument(long companyId, Document document)
110 throws IOException {
111
112 IndexAccessor indexAccessor = getIndexAccessor(companyId);
113
114 indexAccessor.addDocument(document);
115 }
116
117 @Override
118 public void addExactTerm(
119 BooleanQuery booleanQuery, String field, String value) {
120
121 addTerm(booleanQuery, field, value, false);
122 }
123
124 @Override
125 public void addNumericRangeTerm(
126 BooleanQuery booleanQuery, String field, String startValue,
127 String endValue) {
128
129 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
130 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
131 true, true);
132
133 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
134 }
135
136 @Override
137 public void addRangeTerm(
138 BooleanQuery booleanQuery, String field, String startValue,
139 String endValue) {
140
141 boolean includesLower = true;
142
143 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
144 includesLower = false;
145 }
146
147 boolean includesUpper = true;
148
149 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
150 includesUpper = false;
151 }
152
153 TermRangeQuery termRangeQuery = new TermRangeQuery(
154 field, startValue, endValue, includesLower, includesUpper);
155
156 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
157 }
158
159 @Override
160 public void addRequiredTerm(
161 BooleanQuery booleanQuery, String field, String value, boolean like) {
162
163 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
164 }
165
166 @Override
167 public void addRequiredTerm(
168 BooleanQuery booleanQuery, String field, String[] values,
169 boolean like) {
170
171 if (values == null) {
172 return;
173 }
174
175 BooleanQuery query = new BooleanQuery();
176
177 for (String value : values) {
178 addTerm(query, field, value, like);
179 }
180
181 booleanQuery.add(query, BooleanClause.Occur.MUST);
182 }
183
184 @Override
185 public void addTerm(
186 BooleanQuery booleanQuery, String field, String value, boolean like) {
187
188 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
189 }
190
191 @Override
192 public void addTerm(
193 BooleanQuery booleanQuery, String field, String value, boolean like,
194 BooleanClauseOccur booleanClauseOccur) {
195
196 if (Validator.isNull(value)) {
197 return;
198 }
199
200 Analyzer analyzer = getAnalyzer();
201
202 if (analyzer instanceof PerFieldAnalyzer) {
203 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
204
205 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
206
207 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
208 like = true;
209 }
210 }
211
212 if (like) {
213 value = StringUtil.replace(
214 value, StringPool.PERCENT, StringPool.BLANK);
215 }
216
217 try {
218 QueryParser queryParser = new QueryParser(
219 getVersion(), field, analyzer);
220
221 Query query = queryParser.parse(value);
222
223 try {
224 if (like && (query instanceof TermQuery)) {
225
226
227
228 TermQuery termQuery = (TermQuery)query;
229
230 Term term = termQuery.getTerm();
231
232 value = term.text();
233 value = value.toLowerCase(queryParser.getLocale());
234
235 query = new TermQuery(term.createTerm(value));
236
237 query.setBoost(termQuery.getBoost());
238 }
239 }
240 catch (Exception e) {
241 query = queryParser.parse(KeywordsUtil.escape(value));
242 }
243
244 BooleanClause.Occur occur = null;
245
246 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
247 occur = BooleanClause.Occur.MUST;
248 }
249 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
250 occur = BooleanClause.Occur.MUST_NOT;
251 }
252 else {
253 occur = BooleanClause.Occur.SHOULD;
254 }
255
256 _includeIfUnique(booleanQuery, query, occur, like);
257 }
258 catch (Exception e) {
259 if (_log.isWarnEnabled()) {
260 _log.warn(e, e);
261 }
262 }
263 }
264
265 @Override
266 public void addTerm(
267 BooleanQuery booleanQuery, String field, String[] values,
268 boolean like) {
269
270 for (String value : values) {
271 addTerm(booleanQuery, field, value, like);
272 }
273 }
274
275 @Override
276 public void cleanUp(IndexSearcher indexSearcher) {
277 if (indexSearcher == null) {
278 return;
279 }
280
281 try {
282 indexSearcher.close();
283
284 IndexReader indexReader = indexSearcher.getIndexReader();
285
286 if (indexReader != null) {
287 indexReader.close();
288 }
289 }
290 catch (IOException ioe) {
291 _log.error(ioe, ioe);
292 }
293 }
294
295 @Override
296 public int countScoredFieldNames(Query query, String[] filedNames) {
297 int count = 0;
298
299 for (String fieldName : filedNames) {
300 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
301 query, false, fieldName);
302
303 if ((weightedTerms.length > 0) &&
304 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
305
306 count++;
307 }
308 }
309
310 return count;
311 }
312
313 @Override
314 public void delete(long companyId) {
315 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
316
317 if (indexAccessor == null) {
318 return;
319 }
320
321 indexAccessor.delete();
322 }
323
324 @Override
325 public void deleteDocuments(long companyId, Term term) throws IOException {
326 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
327
328 if (indexAccessor == null) {
329 return;
330 }
331
332 indexAccessor.deleteDocuments(term);
333 }
334
335 @Override
336 public void dumpIndex(long companyId, OutputStream outputStream)
337 throws IOException {
338
339 long lastGeneration = getLastGeneration(companyId);
340
341 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
342 if (_log.isDebugEnabled()) {
343 _log.debug(
344 "Dump index from cluster is not enabled for " + companyId);
345 }
346
347 return;
348 }
349
350 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
351
352 if (indexAccessor == null) {
353 return;
354 }
355
356 indexAccessor.dumpIndex(outputStream);
357 }
358
359 @Override
360 public Analyzer getAnalyzer() {
361 return _analyzer;
362 }
363
364 @Override
365 public IndexAccessor getIndexAccessor(long companyId) {
366 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
367
368 if (indexAccessor != null) {
369 return indexAccessor;
370 }
371
372 synchronized (this) {
373 indexAccessor = _indexAccessors.get(companyId);
374
375 if (indexAccessor == null) {
376 indexAccessor = new IndexAccessorImpl(companyId);
377
378 if (isLoadIndexFromClusterEnabled()) {
379 indexAccessor = new SynchronizedIndexAccessorImpl(
380 indexAccessor);
381
382 boolean clusterForwardMessage = GetterUtil.getBoolean(
383 MessageValuesThreadLocal.getValue(
384 ClusterLink.CLUSTER_FORWARD_MESSAGE));
385
386 if (clusterForwardMessage) {
387 if (_log.isInfoEnabled()) {
388 _log.info(
389 "Skip Luncene index files cluster loading " +
390 "since this is a manual reindex request");
391 }
392 }
393 else {
394 try {
395 _loadIndexFromCluster(
396 indexAccessor,
397 indexAccessor.getLastGeneration());
398 }
399 catch (Exception e) {
400 _log.error(
401 "Unable to load index for company " +
402 indexAccessor.getCompanyId(),
403 e);
404 }
405 }
406 }
407
408 _indexAccessors.put(companyId, indexAccessor);
409 }
410 }
411
412 return indexAccessor;
413 }
414
415 @Override
416 public long getLastGeneration(long companyId) {
417 if (!isLoadIndexFromClusterEnabled()) {
418 return IndexAccessor.DEFAULT_LAST_GENERATION;
419 }
420
421 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
422
423 if (indexAccessor == null) {
424 return IndexAccessor.DEFAULT_LAST_GENERATION;
425 }
426
427 return indexAccessor.getLastGeneration();
428 }
429
430 @Override
431 public InputStream getLoadIndexesInputStreamFromCluster(
432 long companyId, Address bootupAddress)
433 throws SystemException {
434
435 if (!isLoadIndexFromClusterEnabled()) {
436 return null;
437 }
438
439 InputStream inputStream = null;
440
441 try {
442 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
443 _getBootupClusterNodeObjectValuePair(bootupAddress);
444
445 URL url = bootupClusterNodeObjectValuePair.getValue();
446
447 URLConnection urlConnection = url.openConnection();
448
449 urlConnection.setDoOutput(true);
450
451 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
452 urlConnection.getOutputStream());
453
454 unsyncPrintWriter.write("transientToken=");
455 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
456 unsyncPrintWriter.write("&companyId=");
457 unsyncPrintWriter.write(String.valueOf(companyId));
458
459 unsyncPrintWriter.close();
460
461 inputStream = urlConnection.getInputStream();
462
463 return inputStream;
464 }
465 catch (IOException ioe) {
466 throw new SystemException(ioe);
467 }
468 }
469
470 @Override
471 public Set<String> getQueryTerms(Query query) {
472 String queryString = StringUtil.replace(
473 query.toString(), StringPool.STAR, StringPool.BLANK);
474
475 Query tempQuery = null;
476
477 try {
478 QueryParser queryParser = new QueryParser(
479 getVersion(), StringPool.BLANK, getAnalyzer());
480
481 tempQuery = queryParser.parse(queryString);
482 }
483 catch (Exception e) {
484 if (_log.isWarnEnabled()) {
485 _log.warn("Unable to parse " + queryString);
486 }
487
488 tempQuery = query;
489 }
490
491 WeightedTerm[] weightedTerms = null;
492
493 for (String fieldName : Field.KEYWORDS) {
494 weightedTerms = QueryTermExtractor.getTerms(
495 tempQuery, false, fieldName);
496
497 if (weightedTerms.length > 0) {
498 break;
499 }
500 }
501
502 Set<String> queryTerms = new HashSet<String>();
503
504 for (WeightedTerm weightedTerm : weightedTerms) {
505 queryTerms.add(weightedTerm.getTerm());
506 }
507
508 return queryTerms;
509 }
510
511 @Override
512 public IndexSearcher getSearcher(long companyId, boolean readOnly)
513 throws IOException {
514
515 IndexAccessor indexAccessor = getIndexAccessor(companyId);
516
517 IndexReader indexReader = IndexReader.open(
518 indexAccessor.getLuceneDir(), readOnly);
519
520 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
521
522 indexSearcher.setDefaultFieldSortScoring(true, true);
523 indexSearcher.setSimilarity(new FieldWeightSimilarity());
524
525 return indexSearcher;
526 }
527
528 @Override
529 public String getSnippet(
530 Query query, String field, String s, int maxNumFragments,
531 int fragmentLength, String fragmentSuffix, Formatter formatter)
532 throws IOException {
533
534 QueryScorer queryScorer = new QueryScorer(query, field);
535
536 Highlighter highlighter = new Highlighter(formatter, queryScorer);
537
538 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
539
540 TokenStream tokenStream = getAnalyzer().tokenStream(
541 field, new UnsyncStringReader(s));
542
543 try {
544 String snippet = highlighter.getBestFragments(
545 tokenStream, s, maxNumFragments, fragmentSuffix);
546
547 if (Validator.isNotNull(snippet) &&
548 !StringUtil.endsWith(snippet, fragmentSuffix) &&
549 !s.equals(snippet)) {
550
551 snippet = snippet.concat(fragmentSuffix);
552 }
553
554 return snippet;
555 }
556 catch (InvalidTokenOffsetsException itoe) {
557 throw new IOException(itoe.getMessage());
558 }
559 }
560
561 @Override
562 public Version getVersion() {
563 return _version;
564 }
565
566 @Override
567 public boolean isLoadIndexFromClusterEnabled() {
568 if (PropsValues.CLUSTER_LINK_ENABLED &&
569 PropsValues.LUCENE_REPLICATE_WRITE) {
570
571 return true;
572 }
573
574 if (_log.isDebugEnabled()) {
575 _log.debug("Load index from cluster is not enabled");
576 }
577
578 return false;
579 }
580
581 @Override
582 public void loadIndex(long companyId, InputStream inputStream)
583 throws IOException {
584
585 if (!isLoadIndexFromClusterEnabled()) {
586 return;
587 }
588
589 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
590
591 if (indexAccessor == null) {
592 if (_log.isInfoEnabled()) {
593 _log.info(
594 "Skip loading Lucene index files for company " + companyId +
595 " in favor of lazy loading");
596 }
597
598 return;
599 }
600
601 StopWatch stopWatch = null;
602
603 if (_log.isInfoEnabled()) {
604 _log.info(
605 "Start loading Lucene index files for company " + companyId);
606
607 stopWatch = new StopWatch();
608
609 stopWatch.start();
610 }
611
612 indexAccessor.loadIndex(inputStream);
613
614 if (_log.isInfoEnabled()) {
615 _log.info(
616 "Finished loading index files for company " + companyId +
617 " in " + stopWatch.getTime() + " ms");
618 }
619 }
620
621 @Override
622 public void loadIndexesFromCluster(long companyId) throws SystemException {
623 if (!isLoadIndexFromClusterEnabled()) {
624 return;
625 }
626
627 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
628
629 if (indexAccessor == null) {
630 return;
631 }
632
633 long localLastGeneration = getLastGeneration(companyId);
634
635 _loadIndexFromCluster(indexAccessor, localLastGeneration);
636 }
637
638 public void setAnalyzer(Analyzer analyzer) {
639 _analyzer = analyzer;
640 }
641
642 public void setVersion(Version version) {
643 _version = version;
644 }
645
646 @Override
647 public void shutdown() {
648 if (_luceneIndexThreadPoolExecutor != null) {
649 _luceneIndexThreadPoolExecutor.shutdownNow();
650
651 try {
652 _luceneIndexThreadPoolExecutor.awaitTermination(
653 60, TimeUnit.SECONDS);
654 }
655 catch (InterruptedException ie) {
656 _log.error("Lucene indexer shutdown interrupted", ie);
657 }
658 }
659
660 if (isLoadIndexFromClusterEnabled()) {
661 ClusterExecutorUtil.removeClusterEventListener(
662 _loadIndexClusterEventListener);
663 }
664
665 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
666 indexAccessor.close();
667 }
668 }
669
670 @Override
671 public void shutdown(long companyId) {
672 IndexAccessor indexAccessor = getIndexAccessor(companyId);
673
674 _indexAccessors.remove(indexAccessor);
675
676 indexAccessor.close();
677 }
678
679 @Override
680 public void startup(long companyId) {
681 if (!PropsValues.INDEX_ON_STARTUP) {
682 return;
683 }
684
685 if (_log.isInfoEnabled()) {
686 _log.info("Indexing Lucene on startup");
687 }
688
689 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
690
691 if (PropsValues.INDEX_WITH_THREAD) {
692 if (_luceneIndexThreadPoolExecutor == null) {
693
694
695
696
697
698 _luceneIndexThreadPoolExecutor =
699 PortalExecutorManagerUtil.getPortalExecutor(
700 LuceneHelperImpl.class.getName());
701 }
702
703 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
704 }
705 else {
706 luceneIndexer.reindex();
707 }
708 }
709
710 @Override
711 public void updateDocument(long companyId, Term term, Document document)
712 throws IOException {
713
714 IndexAccessor indexAccessor = getIndexAccessor(companyId);
715
716 indexAccessor.updateDocument(term, document);
717 }
718
719 private LuceneHelperImpl() {
720 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
721 _luceneIndexThreadPoolExecutor =
722 PortalExecutorManagerUtil.getPortalExecutor(
723 LuceneHelperImpl.class.getName());
724 }
725
726 if (isLoadIndexFromClusterEnabled()) {
727 _loadIndexClusterEventListener =
728 new LoadIndexClusterEventListener();
729
730 ClusterExecutorUtil.addClusterEventListener(
731 _loadIndexClusterEventListener);
732 }
733
734 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
735 }
736
737 private ObjectValuePair<String, URL>
738 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
739 throws SystemException {
740
741 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
742 new MethodHandler(
743 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
744 bootupAddress);
745
746 FutureClusterResponses futureClusterResponses =
747 ClusterExecutorUtil.execute(clusterRequest);
748
749 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
750 futureClusterResponses.getPartialResults();
751
752 try {
753 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
754 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
755 TimeUnit.MILLISECONDS);
756
757 String transientToken = (String)clusterNodeResponse.getResult();
758
759 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
760
761 InetAddress inetAddress = clusterNode.getInetAddress();
762
763 String fileName = PortalUtil.getPathContext();
764
765 if (!fileName.endsWith(StringPool.SLASH)) {
766 fileName = fileName.concat(StringPool.SLASH);
767 }
768
769 fileName = fileName.concat("lucene/dump");
770
771 URL url = new URL(
772 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
773 fileName);
774
775 return new ObjectValuePair<String, URL>(transientToken, url);
776 }
777 catch (Exception e) {
778 throw new SystemException(e);
779 }
780 }
781
782 private void _includeIfUnique(
783 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
784 boolean like) {
785
786 if (query instanceof TermQuery) {
787 Set<Term> terms = new HashSet<Term>();
788
789 TermQuery termQuery = (TermQuery)query;
790
791 termQuery.extractTerms(terms);
792
793 float boost = termQuery.getBoost();
794
795 for (Term term : terms) {
796 String termValue = term.text();
797
798 if (like) {
799 term = term.createTerm(
800 StringPool.STAR.concat(termValue).concat(
801 StringPool.STAR));
802
803 query = new WildcardQuery(term);
804 }
805 else {
806 query = new TermQuery(term);
807 }
808
809 query.setBoost(boost);
810
811 boolean included = false;
812
813 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
814 if (query.equals(booleanClause.getQuery())) {
815 included = true;
816 }
817 }
818
819 if (!included) {
820 booleanQuery.add(query, occur);
821 }
822 }
823 }
824 else if (query instanceof BooleanQuery) {
825 BooleanQuery curBooleanQuery = (BooleanQuery)query;
826
827 BooleanQuery containerBooleanQuery = new BooleanQuery();
828
829 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
830 _includeIfUnique(
831 containerBooleanQuery, booleanClause.getQuery(),
832 booleanClause.getOccur(), like);
833 }
834
835 if (containerBooleanQuery.getClauses().length > 0) {
836 booleanQuery.add(containerBooleanQuery, occur);
837 }
838 }
839 else {
840 boolean included = false;
841
842 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
843 if (query.equals(booleanClause.getQuery())) {
844 included = true;
845 }
846 }
847
848 if (!included) {
849 booleanQuery.add(query, occur);
850 }
851 }
852 }
853
854 private void _loadIndexFromCluster(
855 IndexAccessor indexAccessor, long localLastGeneration)
856 throws SystemException {
857
858 List<Address> clusterNodeAddresses =
859 ClusterExecutorUtil.getClusterNodeAddresses();
860
861 int clusterNodeAddressesCount = clusterNodeAddresses.size();
862
863 if (clusterNodeAddressesCount <= 1) {
864 if (_log.isDebugEnabled()) {
865 _log.debug(
866 "Do not load indexes because there is either one portal " +
867 "instance or no portal instances in the cluster");
868 }
869
870 return;
871 }
872
873 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
874 new MethodHandler(
875 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
876 true);
877
878 ClusterExecutorUtil.execute(
879 clusterRequest,
880 new LoadIndexClusterResponseCallback(
881 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
882 }
883
884 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
885 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
886
887 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
888 GetterUtil.getInteger(
889 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
890 BooleanQuery.getMaxClauseCount());
891
892 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
893
894 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
895
896 private static MethodKey _createTokenMethodKey = new MethodKey(
897 TransientTokenUtil.class, "createToken", long.class);
898 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
899 LuceneHelperUtil.class, "getLastGeneration", long.class);
900
901 private Analyzer _analyzer;
902 private Map<Long, IndexAccessor> _indexAccessors =
903 new ConcurrentHashMap<Long, IndexAccessor>();
904 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
905 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
906 private Version _version;
907
908 private class LoadIndexClusterEventListener
909 implements ClusterEventListener {
910
911 @Override
912 public void processClusterEvent(ClusterEvent clusterEvent) {
913 ClusterEventType clusterEventType =
914 clusterEvent.getClusterEventType();
915
916 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
917 return;
918 }
919
920 List<Address> clusterNodeAddresses =
921 ClusterExecutorUtil.getClusterNodeAddresses();
922 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
923
924 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
925 if (_log.isDebugEnabled()) {
926 _log.debug(
927 "Number of original cluster members is greater than " +
928 "one");
929 }
930
931 return;
932 }
933
934 long[] companyIds = PortalInstances.getCompanyIds();
935
936 for (long companyId : companyIds) {
937 loadIndexes(companyId);
938 }
939
940 loadIndexes(CompanyConstants.SYSTEM);
941 }
942
943 private void loadIndexes(long companyId) {
944 long lastGeneration = getLastGeneration(companyId);
945
946 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
947 return;
948 }
949
950 try {
951 LuceneClusterUtil.loadIndexesFromCluster(companyId);
952 }
953 catch (Exception e) {
954 _log.error(
955 "Unable to load indexes for company " + companyId, e);
956 }
957 }
958
959 }
960
961 private class LoadIndexClusterResponseCallback
962 extends BaseClusterResponseCallback {
963
964 public LoadIndexClusterResponseCallback(
965 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
966 long localLastGeneration) {
967
968 _indexAccessor = indexAccessor;
969 _clusterNodeAddressesCount = clusterNodeAddressesCount;
970 _localLastGeneration = localLastGeneration;
971
972 _companyId = _indexAccessor.getCompanyId();
973 }
974
975 @Override
976 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
977 Address bootupAddress = null;
978
979 do {
980 _clusterNodeAddressesCount--;
981
982 ClusterNodeResponse clusterNodeResponse = null;
983
984 try {
985 clusterNodeResponse = blockingQueue.poll(
986 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
987 TimeUnit.MILLISECONDS);
988 }
989 catch (Exception e) {
990 _log.error("Unable to get cluster node response", e);
991 }
992
993 if (clusterNodeResponse == null) {
994 if (_log.isDebugEnabled()) {
995 _log.debug(
996 "Unable to get cluster node response in " +
997 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
998 TimeUnit.MILLISECONDS);
999 }
1000
1001 continue;
1002 }
1003
1004 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1005
1006 if (clusterNode.getPort() > 0) {
1007 try {
1008 long remoteLastGeneration =
1009 (Long)clusterNodeResponse.getResult();
1010
1011 if (remoteLastGeneration > _localLastGeneration) {
1012 bootupAddress = clusterNodeResponse.getAddress();
1013
1014 break;
1015 }
1016 }
1017 catch (Exception e) {
1018 if (_log.isDebugEnabled()) {
1019 _log.debug(
1020 "Suppress exception caused by remote method " +
1021 "invocation",
1022 e);
1023 }
1024
1025 continue;
1026 }
1027 }
1028 else {
1029 if (_log.isDebugEnabled()) {
1030 _log.debug(
1031 "Cluster node " + clusterNode +
1032 " has invalid port");
1033 }
1034 }
1035 }
1036 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1037
1038 if (bootupAddress == null) {
1039 return;
1040 }
1041
1042 if (_log.isInfoEnabled()) {
1043 _log.info(
1044 "Start loading lucene index files from cluster node " +
1045 bootupAddress);
1046 }
1047
1048 InputStream inputStream = null;
1049
1050 try {
1051 inputStream = getLoadIndexesInputStreamFromCluster(
1052 _companyId, bootupAddress);
1053
1054 _indexAccessor.loadIndex(inputStream);
1055
1056 if (_log.isInfoEnabled()) {
1057 _log.info("Lucene index files loaded successfully");
1058 }
1059 }
1060 catch (Exception e) {
1061 _log.error("Unable to load index for company " + _companyId, e);
1062 }
1063 finally {
1064 if (inputStream != null) {
1065 try {
1066 inputStream.close();
1067 }
1068 catch (IOException ioe) {
1069 _log.error(
1070 "Unable to close input stream for company " +
1071 _companyId,
1072 ioe);
1073 }
1074 }
1075 }
1076 }
1077
1078 @Override
1079 public void processTimeoutException(TimeoutException timeoutException) {
1080 _log.error(
1081 "Unable to load index for company " + _companyId,
1082 timeoutException);
1083 }
1084
1085 private int _clusterNodeAddressesCount;
1086 private long _companyId;
1087 private IndexAccessor _indexAccessor;
1088 private long _localLastGeneration;
1089
1090 }
1091
1092 }