001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036 import com.liferay.portal.kernel.search.BooleanClauseOccur;
037 import com.liferay.portal.kernel.search.Field;
038 import com.liferay.portal.kernel.util.ArrayUtil;
039 import com.liferay.portal.kernel.util.GetterUtil;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.PropsKeys;
044 import com.liferay.portal.kernel.util.StringPool;
045 import com.liferay.portal.kernel.util.StringUtil;
046 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
047 import com.liferay.portal.kernel.util.Validator;
048 import com.liferay.portal.model.CompanyConstants;
049 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
050 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
051 import com.liferay.portal.security.auth.TransientTokenUtil;
052 import com.liferay.portal.util.PortalInstances;
053 import com.liferay.portal.util.PortalUtil;
054 import com.liferay.portal.util.PropsUtil;
055 import com.liferay.portal.util.PropsValues;
056 import com.liferay.util.lucene.KeywordsUtil;
057
058 import java.io.IOException;
059 import java.io.InputStream;
060 import java.io.OutputStream;
061
062 import java.net.InetAddress;
063 import java.net.URL;
064 import java.net.URLConnection;
065
066 import java.util.HashSet;
067 import java.util.List;
068 import java.util.Map;
069 import java.util.Set;
070 import java.util.concurrent.BlockingQueue;
071 import java.util.concurrent.ConcurrentHashMap;
072 import java.util.concurrent.TimeUnit;
073 import java.util.concurrent.TimeoutException;
074
075 import org.apache.commons.lang.time.StopWatch;
076 import org.apache.lucene.analysis.Analyzer;
077 import org.apache.lucene.analysis.TokenStream;
078 import org.apache.lucene.document.Document;
079 import org.apache.lucene.index.IndexReader;
080 import org.apache.lucene.index.Term;
081 import org.apache.lucene.queryParser.QueryParser;
082 import org.apache.lucene.search.BooleanClause;
083 import org.apache.lucene.search.BooleanQuery;
084 import org.apache.lucene.search.IndexSearcher;
085 import org.apache.lucene.search.NumericRangeQuery;
086 import org.apache.lucene.search.Query;
087 import org.apache.lucene.search.TermQuery;
088 import org.apache.lucene.search.TermRangeQuery;
089 import org.apache.lucene.search.WildcardQuery;
090 import org.apache.lucene.search.highlight.Highlighter;
091 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
092 import org.apache.lucene.search.highlight.QueryScorer;
093 import org.apache.lucene.search.highlight.SimpleFragmenter;
094 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
095 import org.apache.lucene.search.highlight.WeightedTerm;
096 import org.apache.lucene.util.Version;
097
098
106 public class LuceneHelperImpl implements LuceneHelper {
107
108 @Override
109 public void addDocument(long companyId, Document document)
110 throws IOException {
111
112 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
113
114 indexAccessor.addDocument(document);
115 }
116
117 @Override
118 public void addExactTerm(
119 BooleanQuery booleanQuery, String field, String value) {
120
121 addTerm(booleanQuery, field, value, false);
122 }
123
124 @Override
125 public void addNumericRangeTerm(
126 BooleanQuery booleanQuery, String field, String startValue,
127 String endValue) {
128
129 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
130 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
131 true, true);
132
133 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
134 }
135
136 @Override
137 public void addRangeTerm(
138 BooleanQuery booleanQuery, String field, String startValue,
139 String endValue) {
140
141 boolean includesLower = true;
142
143 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
144 includesLower = false;
145 }
146
147 boolean includesUpper = true;
148
149 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
150 includesUpper = false;
151 }
152
153 TermRangeQuery termRangeQuery = new TermRangeQuery(
154 field, startValue, endValue, includesLower, includesUpper);
155
156 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
157 }
158
159 @Override
160 public void addRequiredTerm(
161 BooleanQuery booleanQuery, String field, String value, boolean like) {
162
163 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
164 }
165
166 @Override
167 public void addRequiredTerm(
168 BooleanQuery booleanQuery, String field, String[] values,
169 boolean like) {
170
171 if (values == null) {
172 return;
173 }
174
175 BooleanQuery query = new BooleanQuery();
176
177 for (String value : values) {
178 addTerm(query, field, value, like);
179 }
180
181 booleanQuery.add(query, BooleanClause.Occur.MUST);
182 }
183
184 @Override
185 public void addTerm(
186 BooleanQuery booleanQuery, String field, String value, boolean like) {
187
188 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
189 }
190
191 @Override
192 public void addTerm(
193 BooleanQuery booleanQuery, String field, String value, boolean like,
194 BooleanClauseOccur booleanClauseOccur) {
195
196 if (Validator.isNull(value)) {
197 return;
198 }
199
200 Analyzer analyzer = getAnalyzer();
201
202 if (analyzer instanceof PerFieldAnalyzer) {
203 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
204
205 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
206
207 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
208 like = true;
209 }
210 }
211
212 if (like) {
213 value = StringUtil.replace(
214 value, StringPool.PERCENT, StringPool.BLANK);
215 }
216
217 try {
218 QueryParser queryParser = new QueryParser(
219 getVersion(), field, analyzer);
220
221 Query query = queryParser.parse(value);
222
223 try {
224 if (like && (query instanceof TermQuery)) {
225
226
227
228 TermQuery termQuery = (TermQuery)query;
229
230 Term term = termQuery.getTerm();
231
232 value = term.text();
233 value = value.toLowerCase(queryParser.getLocale());
234
235 query = new TermQuery(term.createTerm(value));
236 }
237 }
238 catch (Exception e) {
239 query = queryParser.parse(KeywordsUtil.escape(value));
240 }
241
242 BooleanClause.Occur occur = null;
243
244 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
245 occur = BooleanClause.Occur.MUST;
246 }
247 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
248 occur = BooleanClause.Occur.MUST_NOT;
249 }
250 else {
251 occur = BooleanClause.Occur.SHOULD;
252 }
253
254 _includeIfUnique(booleanQuery, query, occur, like);
255 }
256 catch (Exception e) {
257 _log.error(e, e);
258 }
259 }
260
261 @Override
262 public void addTerm(
263 BooleanQuery booleanQuery, String field, String[] values,
264 boolean like) {
265
266 for (String value : values) {
267 addTerm(booleanQuery, field, value, like);
268 }
269 }
270
271 @Override
272 public int countScoredFieldNames(Query query, String[] filedNames) {
273 int count = 0;
274
275 for (String fieldName : filedNames) {
276 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
277 query, false, fieldName);
278
279 if ((weightedTerms.length > 0) &&
280 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
281
282 count++;
283 }
284 }
285
286 return count;
287 }
288
289 @Override
290 public void delete(long companyId) {
291 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
292
293 if (indexAccessor == null) {
294 return;
295 }
296
297 indexAccessor.delete();
298 }
299
300 @Override
301 public void deleteDocuments(long companyId, Term term) throws IOException {
302 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
303
304 if (indexAccessor == null) {
305 return;
306 }
307
308 indexAccessor.deleteDocuments(term);
309 }
310
311 @Override
312 public void dumpIndex(long companyId, OutputStream outputStream)
313 throws IOException {
314
315 long lastGeneration = getLastGeneration(companyId);
316
317 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
318 if (_log.isDebugEnabled()) {
319 _log.debug(
320 "Dump index from cluster is not enabled for " + companyId);
321 }
322
323 return;
324 }
325
326 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
327
328 if (indexAccessor == null) {
329 return;
330 }
331
332 indexAccessor.dumpIndex(outputStream);
333 }
334
335 @Override
336 public Analyzer getAnalyzer() {
337 return _analyzer;
338 }
339
340 @Override
341 public long getLastGeneration(long companyId) {
342 if (!isLoadIndexFromClusterEnabled()) {
343 return IndexAccessor.DEFAULT_LAST_GENERATION;
344 }
345
346 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
347
348 if (indexAccessor == null) {
349 return IndexAccessor.DEFAULT_LAST_GENERATION;
350 }
351
352 return indexAccessor.getLastGeneration();
353 }
354
355 @Override
356 public InputStream getLoadIndexesInputStreamFromCluster(
357 long companyId, Address bootupAddress)
358 throws SystemException {
359
360 if (!isLoadIndexFromClusterEnabled()) {
361 return null;
362 }
363
364 InputStream inputStream = null;
365
366 try {
367 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
368 _getBootupClusterNodeObjectValuePair(bootupAddress);
369
370 URL url = bootupClusterNodeObjectValuePair.getValue();
371
372 URLConnection urlConnection = url.openConnection();
373
374 urlConnection.setDoOutput(true);
375
376 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
377 urlConnection.getOutputStream());
378
379 unsyncPrintWriter.write("transientToken=");
380 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
381 unsyncPrintWriter.write("&companyId=");
382 unsyncPrintWriter.write(String.valueOf(companyId));
383
384 unsyncPrintWriter.close();
385
386 inputStream = urlConnection.getInputStream();
387
388 return inputStream;
389 }
390 catch (IOException ioe) {
391 throw new SystemException(ioe);
392 }
393 }
394
395 @Override
396 public String[] getQueryTerms(Query query) {
397 String queryString = StringUtil.replace(
398 query.toString(), StringPool.STAR, StringPool.BLANK);
399
400 Query tempQuery = null;
401
402 try {
403 QueryParser queryParser = new QueryParser(
404 getVersion(), StringPool.BLANK, getAnalyzer());
405
406 tempQuery = queryParser.parse(queryString);
407 }
408 catch (Exception e) {
409 if (_log.isWarnEnabled()) {
410 _log.warn("Unable to parse " + queryString);
411 }
412
413 tempQuery = query;
414 }
415
416 WeightedTerm[] weightedTerms = null;
417
418 for (String fieldName : Field.KEYWORDS) {
419 weightedTerms = QueryTermExtractor.getTerms(
420 tempQuery, false, fieldName);
421
422 if (weightedTerms.length > 0) {
423 break;
424 }
425 }
426
427 Set<String> queryTerms = new HashSet<String>();
428
429 for (WeightedTerm weightedTerm : weightedTerms) {
430 queryTerms.add(weightedTerm.getTerm());
431 }
432
433 return queryTerms.toArray(new String[queryTerms.size()]);
434 }
435
436 @Override
437 public IndexSearcher getSearcher(long companyId, boolean readOnly)
438 throws IOException {
439
440 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
441
442 IndexReader indexReader = IndexReader.open(
443 indexAccessor.getLuceneDir(), readOnly);
444
445 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
446
447 indexSearcher.setDefaultFieldSortScoring(true, true);
448 indexSearcher.setSimilarity(new FieldWeightSimilarity());
449
450 return indexSearcher;
451 }
452
453 @Override
454 public String getSnippet(
455 Query query, String field, String s, int maxNumFragments,
456 int fragmentLength, String fragmentSuffix, String preTag,
457 String postTag)
458 throws IOException {
459
460 SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
461 preTag, postTag);
462
463 QueryScorer queryScorer = new QueryScorer(query, field);
464
465 Highlighter highlighter = new Highlighter(
466 simpleHTMLFormatter, queryScorer);
467
468 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
469
470 TokenStream tokenStream = getAnalyzer().tokenStream(
471 field, new UnsyncStringReader(s));
472
473 try {
474 String snippet = highlighter.getBestFragments(
475 tokenStream, s, maxNumFragments, fragmentSuffix);
476
477 if (Validator.isNotNull(snippet) &&
478 !StringUtil.endsWith(snippet, fragmentSuffix) &&
479 !s.equals(snippet)) {
480
481 snippet = snippet.concat(fragmentSuffix);
482 }
483
484 return snippet;
485 }
486 catch (InvalidTokenOffsetsException itoe) {
487 throw new IOException(itoe.getMessage());
488 }
489 }
490
491 @Override
492 public Version getVersion() {
493 return _version;
494 }
495
496 @Override
497 public boolean isLoadIndexFromClusterEnabled() {
498 if (PropsValues.CLUSTER_LINK_ENABLED &&
499 PropsValues.LUCENE_REPLICATE_WRITE) {
500
501 return true;
502 }
503
504 if (_log.isDebugEnabled()) {
505 _log.debug("Load index from cluster is not enabled");
506 }
507
508 return false;
509 }
510
511 @Override
512 public void loadIndex(long companyId, InputStream inputStream)
513 throws IOException {
514
515 if (!isLoadIndexFromClusterEnabled()) {
516 return;
517 }
518
519 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
520
521 if (indexAccessor == null) {
522 if (_log.isInfoEnabled()) {
523 _log.info(
524 "Skip loading Lucene index files for company " + companyId +
525 " in favor of lazy loading");
526 }
527
528 return;
529 }
530
531 StopWatch stopWatch = null;
532
533 if (_log.isInfoEnabled()) {
534 _log.info(
535 "Start loading Lucene index files for company " + companyId);
536
537 stopWatch = new StopWatch();
538
539 stopWatch.start();
540 }
541
542 indexAccessor.loadIndex(inputStream);
543
544 if (_log.isInfoEnabled()) {
545 _log.info(
546 "Finished loading index files for company " + companyId +
547 " in " + stopWatch.getTime() + " ms");
548 }
549 }
550
551 @Override
552 public void loadIndexesFromCluster(long companyId) throws SystemException {
553 if (!isLoadIndexFromClusterEnabled()) {
554 return;
555 }
556
557 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
558
559 if (indexAccessor == null) {
560 return;
561 }
562
563 long localLastGeneration = getLastGeneration(companyId);
564
565 _loadIndexFromCluster(indexAccessor, localLastGeneration);
566 }
567
568 public void setAnalyzer(Analyzer analyzer) {
569 _analyzer = analyzer;
570 }
571
572 public void setVersion(Version version) {
573 _version = version;
574 }
575
576 @Override
577 public void shutdown() {
578 if (_luceneIndexThreadPoolExecutor != null) {
579 _luceneIndexThreadPoolExecutor.shutdownNow();
580
581 try {
582 _luceneIndexThreadPoolExecutor.awaitTermination(
583 60, TimeUnit.SECONDS);
584 }
585 catch (InterruptedException ie) {
586 _log.error("Lucene indexer shutdown interrupted", ie);
587 }
588 }
589
590 if (isLoadIndexFromClusterEnabled()) {
591 ClusterExecutorUtil.removeClusterEventListener(
592 _loadIndexClusterEventListener);
593 }
594
595 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
596 indexAccessor.close();
597 }
598 }
599
600 @Override
601 public void startup(long companyId) {
602 if (!PropsValues.INDEX_ON_STARTUP) {
603 return;
604 }
605
606 if (_log.isInfoEnabled()) {
607 _log.info("Indexing Lucene on startup");
608 }
609
610 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
611
612 if (PropsValues.INDEX_WITH_THREAD) {
613 if (_luceneIndexThreadPoolExecutor == null) {
614
615
616
617
618
619 _luceneIndexThreadPoolExecutor =
620 PortalExecutorManagerUtil.getPortalExecutor(
621 LuceneHelperImpl.class.getName());
622 }
623
624 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
625 }
626 else {
627 luceneIndexer.reindex();
628 }
629 }
630
631 @Override
632 public void updateDocument(long companyId, Term term, Document document)
633 throws IOException {
634
635 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
636
637 indexAccessor.updateDocument(term, document);
638 }
639
640 private LuceneHelperImpl() {
641 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
642 _luceneIndexThreadPoolExecutor =
643 PortalExecutorManagerUtil.getPortalExecutor(
644 LuceneHelperImpl.class.getName());
645 }
646
647 if (isLoadIndexFromClusterEnabled()) {
648 _loadIndexClusterEventListener =
649 new LoadIndexClusterEventListener();
650
651 ClusterExecutorUtil.addClusterEventListener(
652 _loadIndexClusterEventListener);
653 }
654
655 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
656 }
657
658 private ObjectValuePair<String, URL>
659 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
660 throws SystemException {
661
662 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
663 new MethodHandler(
664 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
665 bootupAddress);
666
667 FutureClusterResponses futureClusterResponses =
668 ClusterExecutorUtil.execute(clusterRequest);
669
670 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
671 futureClusterResponses.getPartialResults();
672
673 try {
674 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
675 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
676 TimeUnit.MILLISECONDS);
677
678 String transientToken = (String)clusterNodeResponse.getResult();
679
680 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
681
682 InetAddress inetAddress = clusterNode.getInetAddress();
683
684 String fileName = PortalUtil.getPathContext();
685
686 if (!fileName.endsWith(StringPool.SLASH)) {
687 fileName = fileName.concat(StringPool.SLASH);
688 }
689
690 fileName = fileName.concat("lucene/dump");
691
692 URL url = new URL(
693 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
694 fileName);
695
696 return new ObjectValuePair<String, URL>(transientToken, url);
697 }
698 catch (Exception e) {
699 throw new SystemException(e);
700 }
701 }
702
703 private IndexAccessor _getIndexAccessor(long companyId) {
704 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
705
706 if (indexAccessor != null) {
707 return indexAccessor;
708 }
709
710 synchronized (this) {
711 indexAccessor = _indexAccessors.get(companyId);
712
713 if (indexAccessor == null) {
714 indexAccessor = new IndexAccessorImpl(companyId);
715
716 if (isLoadIndexFromClusterEnabled()) {
717 boolean clusterForwardMessage = GetterUtil.getBoolean(
718 MessageValuesThreadLocal.getValue(
719 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE));
720
721 if (clusterForwardMessage) {
722 if (_log.isInfoEnabled()) {
723 _log.info(
724 "Skip Luncene index files cluster loading " +
725 "since this is a manual reindex request");
726 }
727 }
728 else {
729 indexAccessor = new SynchronizedIndexAccessorImpl(
730 indexAccessor);
731
732 try {
733 _loadIndexFromCluster(
734 indexAccessor,
735 indexAccessor.getLastGeneration());
736 }
737 catch (Exception e) {
738 _log.error(
739 "Unable to load index for company " +
740 indexAccessor.getCompanyId(),
741 e);
742 }
743 }
744 }
745
746 _indexAccessors.put(companyId, indexAccessor);
747 }
748 }
749
750 return indexAccessor;
751 }
752
753 private void _includeIfUnique(
754 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
755 boolean like) {
756
757 if (query instanceof TermQuery) {
758 Set<Term> terms = new HashSet<Term>();
759
760 query.extractTerms(terms);
761
762 for (Term term : terms) {
763 String termValue = term.text();
764
765 if (like) {
766 term = term.createTerm(
767 StringPool.STAR.concat(termValue).concat(
768 StringPool.STAR));
769
770 query = new WildcardQuery(term);
771 }
772 else {
773 query = new TermQuery(term);
774 }
775
776 boolean included = false;
777
778 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
779 if (query.equals(booleanClause.getQuery())) {
780 included = true;
781 }
782 }
783
784 if (!included) {
785 booleanQuery.add(query, occur);
786 }
787 }
788 }
789 else if (query instanceof BooleanQuery) {
790 BooleanQuery curBooleanQuery = (BooleanQuery)query;
791
792 BooleanQuery containerBooleanQuery = new BooleanQuery();
793
794 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
795 _includeIfUnique(
796 containerBooleanQuery, booleanClause.getQuery(),
797 booleanClause.getOccur(), like);
798 }
799
800 if (containerBooleanQuery.getClauses().length > 0) {
801 booleanQuery.add(containerBooleanQuery, occur);
802 }
803 }
804 else {
805 boolean included = false;
806
807 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
808 if (query.equals(booleanClause.getQuery())) {
809 included = true;
810 }
811 }
812
813 if (!included) {
814 booleanQuery.add(query, occur);
815 }
816 }
817 }
818
819 private void _loadIndexFromCluster(
820 IndexAccessor indexAccessor, long localLastGeneration)
821 throws SystemException {
822
823 List<Address> clusterNodeAddresses =
824 ClusterExecutorUtil.getClusterNodeAddresses();
825
826 int clusterNodeAddressesCount = clusterNodeAddresses.size();
827
828 if (clusterNodeAddressesCount <= 1) {
829 if (_log.isDebugEnabled()) {
830 _log.debug(
831 "Do not load indexes because there is either one portal " +
832 "instance or no portal instances in the cluster");
833 }
834
835 return;
836 }
837
838 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
839 new MethodHandler(
840 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
841 true);
842
843 ClusterExecutorUtil.execute(
844 clusterRequest,
845 new LoadIndexClusterResponseCallback(
846 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
847 }
848
849 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
850 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
851
852 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
853 GetterUtil.getInteger(
854 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
855 BooleanQuery.getMaxClauseCount());
856
857 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
858
859 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
860
861 private static MethodKey _createTokenMethodKey =
862 new MethodKey(TransientTokenUtil.class.getName(), "createToken",
863 long.class);
864 private static MethodKey _getLastGenerationMethodKey =
865 new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
866 long.class);
867
868 private Analyzer _analyzer;
869 private Map<Long, IndexAccessor> _indexAccessors =
870 new ConcurrentHashMap<Long, IndexAccessor>();
871 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
872 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
873 private Version _version;
874
875 private class LoadIndexClusterEventListener
876 implements ClusterEventListener {
877
878 @Override
879 public void processClusterEvent(ClusterEvent clusterEvent) {
880 ClusterEventType clusterEventType =
881 clusterEvent.getClusterEventType();
882
883 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
884 return;
885 }
886
887 List<Address> clusterNodeAddresses =
888 ClusterExecutorUtil.getClusterNodeAddresses();
889 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
890
891 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
892 if (_log.isDebugEnabled()) {
893 _log.debug(
894 "Number of original cluster members is greater than " +
895 "one");
896 }
897
898 return;
899 }
900
901 long[] companyIds = PortalInstances.getCompanyIds();
902
903 for (long companyId : companyIds) {
904 loadIndexes(companyId);
905 }
906
907 loadIndexes(CompanyConstants.SYSTEM);
908 }
909
910 private void loadIndexes(long companyId) {
911 long lastGeneration = getLastGeneration(companyId);
912
913 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
914 return;
915 }
916
917 try {
918 LuceneClusterUtil.loadIndexesFromCluster(companyId);
919 }
920 catch (Exception e) {
921 _log.error(
922 "Unable to load indexes for company " + companyId, e);
923 }
924 }
925
926 }
927
928 private class LoadIndexClusterResponseCallback
929 extends BaseClusterResponseCallback {
930
931 public LoadIndexClusterResponseCallback(
932 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
933 long localLastGeneration) {
934
935 _indexAccessor = indexAccessor;
936 _clusterNodeAddressesCount = clusterNodeAddressesCount;
937 _localLastGeneration = localLastGeneration;
938
939 _companyId = _indexAccessor.getCompanyId();
940 }
941
942 @Override
943 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
944 Address bootupAddress = null;
945
946 do {
947 _clusterNodeAddressesCount--;
948
949 ClusterNodeResponse clusterNodeResponse = null;
950
951 try {
952 clusterNodeResponse = blockingQueue.poll(
953 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
954 TimeUnit.MILLISECONDS);
955 }
956 catch (Exception e) {
957 _log.error("Unable to get cluster node response", e);
958 }
959
960 if (clusterNodeResponse == null) {
961 if (_log.isDebugEnabled()) {
962 _log.debug(
963 "Unable to get cluster node response in " +
964 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
965 TimeUnit.MILLISECONDS);
966 }
967
968 continue;
969 }
970
971 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
972
973 if (clusterNode.getPort() > 0) {
974 try {
975 long remoteLastGeneration =
976 (Long)clusterNodeResponse.getResult();
977
978 if (remoteLastGeneration > _localLastGeneration) {
979 bootupAddress = clusterNodeResponse.getAddress();
980
981 break;
982 }
983 }
984 catch (Exception e) {
985 if (_log.isDebugEnabled()) {
986 _log.debug(
987 "Suppress exception caused by remote method " +
988 "invocation",
989 e);
990 }
991
992 continue;
993 }
994 }
995 else {
996 if (_log.isDebugEnabled()) {
997 _log.debug(
998 "Cluster node " + clusterNode +
999 " has invalid port");
1000 }
1001 }
1002 }
1003 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1004
1005 if (bootupAddress == null) {
1006 return;
1007 }
1008
1009 if (_log.isInfoEnabled()) {
1010 _log.info(
1011 "Start loading lucene index files from cluster node " +
1012 bootupAddress);
1013 }
1014
1015 InputStream inputStream = null;
1016
1017 try {
1018 inputStream = getLoadIndexesInputStreamFromCluster(
1019 _companyId, bootupAddress);
1020
1021 _indexAccessor.loadIndex(inputStream);
1022
1023 if (_log.isInfoEnabled()) {
1024 _log.info("Lucene index files loaded successfully");
1025 }
1026 }
1027 catch (Exception e) {
1028 _log.error("Unable to load index for company " + _companyId, e);
1029 }
1030 finally {
1031 if (inputStream != null) {
1032 try {
1033 inputStream.close();
1034 }
1035 catch (IOException ioe) {
1036 _log.error(
1037 "Unable to close input stream for company " +
1038 _companyId,
1039 ioe);
1040 }
1041 }
1042 }
1043 }
1044
1045 @Override
1046 public void processTimeoutException(TimeoutException timeoutException) {
1047 _log.error(
1048 "Unable to load index for company " + _companyId,
1049 timeoutException);
1050 }
1051
1052 private int _clusterNodeAddressesCount;
1053 private long _companyId;
1054 private IndexAccessor _indexAccessor;
1055 private long _localLastGeneration;
1056
1057 }
1058
1059 }