001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLink;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036 import com.liferay.portal.kernel.search.BooleanClauseOccur;
037 import com.liferay.portal.kernel.search.Field;
038 import com.liferay.portal.kernel.util.ArrayUtil;
039 import com.liferay.portal.kernel.util.GetterUtil;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.PropsKeys;
044 import com.liferay.portal.kernel.util.StringPool;
045 import com.liferay.portal.kernel.util.StringUtil;
046 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
047 import com.liferay.portal.kernel.util.Validator;
048 import com.liferay.portal.model.CompanyConstants;
049 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
050 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
051 import com.liferay.portal.security.auth.TransientTokenUtil;
052 import com.liferay.portal.util.PortalInstances;
053 import com.liferay.portal.util.PortalUtil;
054 import com.liferay.portal.util.PropsUtil;
055 import com.liferay.portal.util.PropsValues;
056 import com.liferay.util.lucene.KeywordsUtil;
057
058 import java.io.IOException;
059 import java.io.InputStream;
060 import java.io.OutputStream;
061
062 import java.net.InetAddress;
063 import java.net.URL;
064 import java.net.URLConnection;
065
066 import java.util.HashSet;
067 import java.util.List;
068 import java.util.Map;
069 import java.util.Set;
070 import java.util.concurrent.BlockingQueue;
071 import java.util.concurrent.ConcurrentHashMap;
072 import java.util.concurrent.TimeUnit;
073 import java.util.concurrent.TimeoutException;
074
075 import org.apache.commons.lang.time.StopWatch;
076 import org.apache.lucene.analysis.Analyzer;
077 import org.apache.lucene.analysis.TokenStream;
078 import org.apache.lucene.document.Document;
079 import org.apache.lucene.index.IndexReader;
080 import org.apache.lucene.index.Term;
081 import org.apache.lucene.queryParser.QueryParser;
082 import org.apache.lucene.search.BooleanClause;
083 import org.apache.lucene.search.BooleanQuery;
084 import org.apache.lucene.search.IndexSearcher;
085 import org.apache.lucene.search.NumericRangeQuery;
086 import org.apache.lucene.search.Query;
087 import org.apache.lucene.search.TermQuery;
088 import org.apache.lucene.search.TermRangeQuery;
089 import org.apache.lucene.search.WildcardQuery;
090 import org.apache.lucene.search.highlight.Formatter;
091 import org.apache.lucene.search.highlight.Highlighter;
092 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
093 import org.apache.lucene.search.highlight.QueryScorer;
094 import org.apache.lucene.search.highlight.SimpleFragmenter;
095 import org.apache.lucene.search.highlight.WeightedTerm;
096 import org.apache.lucene.util.Version;
097
098
107 public class LuceneHelperImpl implements LuceneHelper {
108
109 @Override
110 public void addDocument(long companyId, Document document)
111 throws IOException {
112
113 IndexAccessor indexAccessor = getIndexAccessor(companyId);
114
115 indexAccessor.addDocument(document);
116 }
117
118 @Override
119 public void addExactTerm(
120 BooleanQuery booleanQuery, String field, String value) {
121
122 addTerm(booleanQuery, field, value, false);
123 }
124
125 @Override
126 public void addNumericRangeTerm(
127 BooleanQuery booleanQuery, String field, Integer startValue,
128 Integer endValue) {
129
130 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
131 field, startValue, endValue, true, true);
132
133 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
134 }
135
136 @Override
137 public void addNumericRangeTerm(
138 BooleanQuery booleanQuery, String field, Long startValue,
139 Long endValue) {
140
141 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
142 field, startValue, endValue, true, true);
143
144 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
145 }
146
147
151 @Override
152 public void addNumericRangeTerm(
153 BooleanQuery booleanQuery, String field, String startValue,
154 String endValue) {
155
156 addNumericRangeTerm(
157 booleanQuery, field, GetterUtil.getLong(startValue),
158 GetterUtil.getLong(endValue));
159 }
160
161 @Override
162 public void addRangeTerm(
163 BooleanQuery booleanQuery, String field, String startValue,
164 String endValue) {
165
166 boolean includesLower = true;
167
168 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
169 includesLower = false;
170 }
171
172 boolean includesUpper = true;
173
174 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
175 includesUpper = false;
176 }
177
178 TermRangeQuery termRangeQuery = new TermRangeQuery(
179 field, startValue, endValue, includesLower, includesUpper);
180
181 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
182 }
183
184 @Override
185 public void addRequiredTerm(
186 BooleanQuery booleanQuery, String field, String value, boolean like) {
187
188 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
189 }
190
191 @Override
192 public void addRequiredTerm(
193 BooleanQuery booleanQuery, String field, String[] values,
194 boolean like) {
195
196 if (values == null) {
197 return;
198 }
199
200 BooleanQuery query = new BooleanQuery();
201
202 for (String value : values) {
203 addTerm(query, field, value, like);
204 }
205
206 booleanQuery.add(query, BooleanClause.Occur.MUST);
207 }
208
209 @Override
210 public void addTerm(
211 BooleanQuery booleanQuery, String field, String value, boolean like) {
212
213 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
214 }
215
216 @Override
217 public void addTerm(
218 BooleanQuery booleanQuery, String field, String value, boolean like,
219 BooleanClauseOccur booleanClauseOccur) {
220
221 if (Validator.isNull(value)) {
222 return;
223 }
224
225 Analyzer analyzer = getAnalyzer();
226
227 if (analyzer instanceof PerFieldAnalyzer) {
228 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
229
230 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
231
232 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
233 like = true;
234 }
235 }
236
237 if (like) {
238 value = StringUtil.replace(
239 value, StringPool.PERCENT, StringPool.BLANK);
240 }
241
242 try {
243 QueryParser queryParser = new QueryParser(
244 getVersion(), field, analyzer);
245
246 Query query = queryParser.parse(value);
247
248 try {
249 if (like && (query instanceof TermQuery)) {
250
251
252
253 TermQuery termQuery = (TermQuery)query;
254
255 Term term = termQuery.getTerm();
256
257 value = term.text();
258 value = value.toLowerCase(queryParser.getLocale());
259
260 query = new TermQuery(term.createTerm(value));
261
262 query.setBoost(termQuery.getBoost());
263 }
264 }
265 catch (Exception e) {
266 query = queryParser.parse(KeywordsUtil.escape(value));
267 }
268
269 BooleanClause.Occur occur = null;
270
271 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
272 occur = BooleanClause.Occur.MUST;
273 }
274 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
275 occur = BooleanClause.Occur.MUST_NOT;
276 }
277 else {
278 occur = BooleanClause.Occur.SHOULD;
279 }
280
281 _includeIfUnique(booleanQuery, query, occur, like);
282 }
283 catch (Exception e) {
284 if (_log.isWarnEnabled()) {
285 _log.warn(e, e);
286 }
287 }
288 }
289
290 @Override
291 public void addTerm(
292 BooleanQuery booleanQuery, String field, String[] values,
293 boolean like) {
294
295 for (String value : values) {
296 addTerm(booleanQuery, field, value, like);
297 }
298 }
299
300 @Override
301 public void cleanUp(IndexSearcher indexSearcher) {
302 if (indexSearcher == null) {
303 return;
304 }
305
306 try {
307 indexSearcher.close();
308
309 IndexReader indexReader = indexSearcher.getIndexReader();
310
311 if (indexReader != null) {
312 indexReader.close();
313 }
314 }
315 catch (IOException ioe) {
316 _log.error(ioe, ioe);
317 }
318 }
319
320 @Override
321 public int countScoredFieldNames(Query query, String[] filedNames) {
322 int count = 0;
323
324 for (String fieldName : filedNames) {
325 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
326 query, false, fieldName);
327
328 if ((weightedTerms.length > 0) &&
329 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
330
331 count++;
332 }
333 }
334
335 return count;
336 }
337
338 @Override
339 public void delete(long companyId) {
340 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
341
342 if (indexAccessor == null) {
343 return;
344 }
345
346 indexAccessor.delete();
347 }
348
349 @Override
350 public void deleteDocuments(long companyId, Term term) throws IOException {
351 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
352
353 if (indexAccessor == null) {
354 return;
355 }
356
357 indexAccessor.deleteDocuments(term);
358 }
359
360 @Override
361 public void dumpIndex(long companyId, OutputStream outputStream)
362 throws IOException {
363
364 long lastGeneration = getLastGeneration(companyId);
365
366 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
367 if (_log.isDebugEnabled()) {
368 _log.debug(
369 "Dump index from cluster is not enabled for " + companyId);
370 }
371
372 return;
373 }
374
375 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
376
377 if (indexAccessor == null) {
378 return;
379 }
380
381 indexAccessor.dumpIndex(outputStream);
382 }
383
384 @Override
385 public Analyzer getAnalyzer() {
386 return _analyzer;
387 }
388
389 @Override
390 public IndexAccessor getIndexAccessor(long companyId) {
391 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
392
393 if (indexAccessor != null) {
394 return indexAccessor;
395 }
396
397 synchronized (this) {
398 indexAccessor = _indexAccessors.get(companyId);
399
400 if (indexAccessor == null) {
401 indexAccessor = new IndexAccessorImpl(companyId);
402
403 if (isLoadIndexFromClusterEnabled()) {
404 indexAccessor = new SynchronizedIndexAccessorImpl(
405 indexAccessor);
406
407 boolean clusterForwardMessage = GetterUtil.getBoolean(
408 MessageValuesThreadLocal.getValue(
409 ClusterLink.CLUSTER_FORWARD_MESSAGE));
410
411 if (clusterForwardMessage) {
412 if (_log.isInfoEnabled()) {
413 _log.info(
414 "Skip Luncene index files cluster loading " +
415 "since this is a manual reindex request");
416 }
417 }
418 else {
419 try {
420 _loadIndexFromCluster(
421 indexAccessor,
422 indexAccessor.getLastGeneration());
423 }
424 catch (Exception e) {
425 _log.error(
426 "Unable to load index for company " +
427 indexAccessor.getCompanyId(),
428 e);
429 }
430 }
431 }
432
433 _indexAccessors.put(companyId, indexAccessor);
434 }
435 }
436
437 return indexAccessor;
438 }
439
440 @Override
441 public long getLastGeneration(long companyId) {
442 if (!isLoadIndexFromClusterEnabled()) {
443 return IndexAccessor.DEFAULT_LAST_GENERATION;
444 }
445
446 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
447
448 if (indexAccessor == null) {
449 return IndexAccessor.DEFAULT_LAST_GENERATION;
450 }
451
452 return indexAccessor.getLastGeneration();
453 }
454
455 @Override
456 public InputStream getLoadIndexesInputStreamFromCluster(
457 long companyId, Address bootupAddress)
458 throws SystemException {
459
460 if (!isLoadIndexFromClusterEnabled()) {
461 return null;
462 }
463
464 InputStream inputStream = null;
465
466 try {
467 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
468 _getBootupClusterNodeObjectValuePair(bootupAddress);
469
470 URL url = bootupClusterNodeObjectValuePair.getValue();
471
472 URLConnection urlConnection = url.openConnection();
473
474 urlConnection.setDoOutput(true);
475
476 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
477 urlConnection.getOutputStream());
478
479 unsyncPrintWriter.write("transientToken=");
480 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
481 unsyncPrintWriter.write("&companyId=");
482 unsyncPrintWriter.write(String.valueOf(companyId));
483
484 unsyncPrintWriter.close();
485
486 inputStream = urlConnection.getInputStream();
487
488 return inputStream;
489 }
490 catch (IOException ioe) {
491 throw new SystemException(ioe);
492 }
493 }
494
495 @Override
496 public Set<String> getQueryTerms(Query query) {
497 String queryString = StringUtil.replace(
498 query.toString(), StringPool.STAR, StringPool.BLANK);
499
500 Query tempQuery = null;
501
502 try {
503 QueryParser queryParser = new QueryParser(
504 getVersion(), StringPool.BLANK, getAnalyzer());
505
506 tempQuery = queryParser.parse(queryString);
507 }
508 catch (Exception e) {
509 if (_log.isWarnEnabled()) {
510 _log.warn("Unable to parse " + queryString);
511 }
512
513 tempQuery = query;
514 }
515
516 WeightedTerm[] weightedTerms = null;
517
518 for (String fieldName : Field.KEYWORDS) {
519 weightedTerms = QueryTermExtractor.getTerms(
520 tempQuery, false, fieldName);
521
522 if (weightedTerms.length > 0) {
523 break;
524 }
525 }
526
527 Set<String> queryTerms = new HashSet<String>();
528
529 for (WeightedTerm weightedTerm : weightedTerms) {
530 queryTerms.add(weightedTerm.getTerm());
531 }
532
533 return queryTerms;
534 }
535
536 @Override
537 public IndexSearcher getSearcher(long companyId, boolean readOnly)
538 throws IOException {
539
540 IndexAccessor indexAccessor = getIndexAccessor(companyId);
541
542 IndexReader indexReader = IndexReader.open(
543 indexAccessor.getLuceneDir(), readOnly);
544
545 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
546
547 indexSearcher.setDefaultFieldSortScoring(true, true);
548 indexSearcher.setSimilarity(new FieldWeightSimilarity());
549
550 return indexSearcher;
551 }
552
553 @Override
554 public String getSnippet(
555 Query query, String field, String s, int maxNumFragments,
556 int fragmentLength, String fragmentSuffix, Formatter formatter)
557 throws IOException {
558
559 QueryScorer queryScorer = new QueryScorer(query, field);
560
561 Highlighter highlighter = new Highlighter(formatter, queryScorer);
562
563 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
564
565 TokenStream tokenStream = getAnalyzer().tokenStream(
566 field, new UnsyncStringReader(s));
567
568 try {
569 String snippet = highlighter.getBestFragments(
570 tokenStream, s, maxNumFragments, fragmentSuffix);
571
572 if (Validator.isNotNull(snippet) &&
573 !StringUtil.endsWith(snippet, fragmentSuffix) &&
574 !s.equals(snippet)) {
575
576 snippet = snippet.concat(fragmentSuffix);
577 }
578
579 return snippet;
580 }
581 catch (InvalidTokenOffsetsException itoe) {
582 throw new IOException(itoe.getMessage());
583 }
584 }
585
586 @Override
587 public Version getVersion() {
588 return _version;
589 }
590
591 @Override
592 public boolean isLoadIndexFromClusterEnabled() {
593 if (PropsValues.CLUSTER_LINK_ENABLED &&
594 PropsValues.LUCENE_REPLICATE_WRITE) {
595
596 return true;
597 }
598
599 if (_log.isDebugEnabled()) {
600 _log.debug("Load index from cluster is not enabled");
601 }
602
603 return false;
604 }
605
606 @Override
607 public void loadIndex(long companyId, InputStream inputStream)
608 throws IOException {
609
610 if (!isLoadIndexFromClusterEnabled()) {
611 return;
612 }
613
614 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
615
616 if (indexAccessor == null) {
617 if (_log.isInfoEnabled()) {
618 _log.info(
619 "Skip loading Lucene index files for company " + companyId +
620 " in favor of lazy loading");
621 }
622
623 return;
624 }
625
626 StopWatch stopWatch = null;
627
628 if (_log.isInfoEnabled()) {
629 _log.info(
630 "Start loading Lucene index files for company " + companyId);
631
632 stopWatch = new StopWatch();
633
634 stopWatch.start();
635 }
636
637 indexAccessor.loadIndex(inputStream);
638
639 if (_log.isInfoEnabled()) {
640 _log.info(
641 "Finished loading index files for company " + companyId +
642 " in " + stopWatch.getTime() + " ms");
643 }
644 }
645
646 @Override
647 public void loadIndexesFromCluster(long companyId) throws SystemException {
648 if (!isLoadIndexFromClusterEnabled()) {
649 return;
650 }
651
652 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
653
654 if (indexAccessor == null) {
655 return;
656 }
657
658 long localLastGeneration = getLastGeneration(companyId);
659
660 _loadIndexFromCluster(indexAccessor, localLastGeneration);
661 }
662
663 public void setAnalyzer(Analyzer analyzer) {
664 _analyzer = analyzer;
665 }
666
667 public void setVersion(Version version) {
668 _version = version;
669 }
670
671 @Override
672 public void shutdown() {
673 if (_luceneIndexThreadPoolExecutor != null) {
674 _luceneIndexThreadPoolExecutor.shutdownNow();
675
676 try {
677 _luceneIndexThreadPoolExecutor.awaitTermination(
678 60, TimeUnit.SECONDS);
679 }
680 catch (InterruptedException ie) {
681 _log.error("Lucene indexer shutdown interrupted", ie);
682 }
683 }
684
685 if (isLoadIndexFromClusterEnabled()) {
686 ClusterExecutorUtil.removeClusterEventListener(
687 _loadIndexClusterEventListener);
688 }
689
690 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
691 indexAccessor.close();
692 }
693 }
694
695 @Override
696 public void shutdown(long companyId) {
697 IndexAccessor indexAccessor = getIndexAccessor(companyId);
698
699 _indexAccessors.remove(indexAccessor);
700
701 indexAccessor.close();
702 }
703
704 @Override
705 public void startup(long companyId) {
706 if (!PropsValues.INDEX_ON_STARTUP) {
707 return;
708 }
709
710 if (_log.isInfoEnabled()) {
711 _log.info("Indexing Lucene on startup");
712 }
713
714 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
715
716 if (PropsValues.INDEX_WITH_THREAD) {
717 if (_luceneIndexThreadPoolExecutor == null) {
718
719
720
721
722
723 _luceneIndexThreadPoolExecutor =
724 PortalExecutorManagerUtil.getPortalExecutor(
725 LuceneHelperImpl.class.getName());
726 }
727
728 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
729 }
730 else {
731 luceneIndexer.reindex();
732 }
733 }
734
735 @Override
736 public void updateDocument(long companyId, Term term, Document document)
737 throws IOException {
738
739 IndexAccessor indexAccessor = getIndexAccessor(companyId);
740
741 indexAccessor.updateDocument(term, document);
742 }
743
744 private LuceneHelperImpl() {
745 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
746 _luceneIndexThreadPoolExecutor =
747 PortalExecutorManagerUtil.getPortalExecutor(
748 LuceneHelperImpl.class.getName());
749 }
750
751 if (isLoadIndexFromClusterEnabled()) {
752 _loadIndexClusterEventListener =
753 new LoadIndexClusterEventListener();
754
755 ClusterExecutorUtil.addClusterEventListener(
756 _loadIndexClusterEventListener);
757 }
758
759 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
760 }
761
762 private ObjectValuePair<String, URL>
763 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
764 throws SystemException {
765
766 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
767 new MethodHandler(
768 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
769 bootupAddress);
770
771 FutureClusterResponses futureClusterResponses =
772 ClusterExecutorUtil.execute(clusterRequest);
773
774 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
775 futureClusterResponses.getPartialResults();
776
777 try {
778 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
779 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
780 TimeUnit.MILLISECONDS);
781
782 String transientToken = (String)clusterNodeResponse.getResult();
783
784 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
785
786 InetAddress inetAddress = clusterNode.getInetAddress();
787
788 String fileName = PortalUtil.getPathContext();
789
790 if (!fileName.endsWith(StringPool.SLASH)) {
791 fileName = fileName.concat(StringPool.SLASH);
792 }
793
794 fileName = fileName.concat("lucene/dump");
795
796 URL url = new URL(
797 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
798 fileName);
799
800 return new ObjectValuePair<String, URL>(transientToken, url);
801 }
802 catch (Exception e) {
803 throw new SystemException(e);
804 }
805 }
806
807 private void _includeIfUnique(
808 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
809 boolean like) {
810
811 if (query instanceof TermQuery) {
812 Set<Term> terms = new HashSet<Term>();
813
814 TermQuery termQuery = (TermQuery)query;
815
816 termQuery.extractTerms(terms);
817
818 float boost = termQuery.getBoost();
819
820 for (Term term : terms) {
821 String termValue = term.text();
822
823 if (like) {
824 term = term.createTerm(
825 StringPool.STAR.concat(termValue).concat(
826 StringPool.STAR));
827
828 query = new WildcardQuery(term);
829 }
830 else {
831 query = new TermQuery(term);
832 }
833
834 query.setBoost(boost);
835
836 boolean included = false;
837
838 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
839 if (query.equals(booleanClause.getQuery())) {
840 included = true;
841 }
842 }
843
844 if (!included) {
845 booleanQuery.add(query, occur);
846 }
847 }
848 }
849 else if (query instanceof BooleanQuery) {
850 BooleanQuery curBooleanQuery = (BooleanQuery)query;
851
852 BooleanQuery containerBooleanQuery = new BooleanQuery();
853
854 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
855 _includeIfUnique(
856 containerBooleanQuery, booleanClause.getQuery(),
857 booleanClause.getOccur(), like);
858 }
859
860 if (containerBooleanQuery.getClauses().length > 0) {
861 booleanQuery.add(containerBooleanQuery, occur);
862 }
863 }
864 else {
865 boolean included = false;
866
867 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
868 if (query.equals(booleanClause.getQuery())) {
869 included = true;
870 }
871 }
872
873 if (!included) {
874 booleanQuery.add(query, occur);
875 }
876 }
877 }
878
879 private void _loadIndexFromCluster(
880 IndexAccessor indexAccessor, long localLastGeneration)
881 throws SystemException {
882
883 List<Address> clusterNodeAddresses =
884 ClusterExecutorUtil.getClusterNodeAddresses();
885
886 int clusterNodeAddressesCount = clusterNodeAddresses.size();
887
888 if (clusterNodeAddressesCount <= 1) {
889 if (_log.isDebugEnabled()) {
890 _log.debug(
891 "Do not load indexes because there is either one portal " +
892 "instance or no portal instances in the cluster");
893 }
894
895 return;
896 }
897
898 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
899 new MethodHandler(
900 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
901 true);
902
903 ClusterExecutorUtil.execute(
904 clusterRequest,
905 new LoadIndexClusterResponseCallback(
906 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
907 }
908
909 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
910 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
911
912 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
913 GetterUtil.getInteger(
914 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
915 BooleanQuery.getMaxClauseCount());
916
917 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
918
919 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
920
921 private static MethodKey _createTokenMethodKey = new MethodKey(
922 TransientTokenUtil.class, "createToken", long.class);
923 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
924 LuceneHelperUtil.class, "getLastGeneration", long.class);
925
926 private Analyzer _analyzer;
927 private Map<Long, IndexAccessor> _indexAccessors =
928 new ConcurrentHashMap<Long, IndexAccessor>();
929 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
930 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
931 private Version _version;
932
933 private class LoadIndexClusterEventListener
934 implements ClusterEventListener {
935
936 @Override
937 public void processClusterEvent(ClusterEvent clusterEvent) {
938 ClusterEventType clusterEventType =
939 clusterEvent.getClusterEventType();
940
941 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
942 return;
943 }
944
945 List<Address> clusterNodeAddresses =
946 ClusterExecutorUtil.getClusterNodeAddresses();
947 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
948
949 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
950 if (_log.isDebugEnabled()) {
951 _log.debug(
952 "Number of original cluster members is greater than " +
953 "one");
954 }
955
956 return;
957 }
958
959 long[] companyIds = PortalInstances.getCompanyIds();
960
961 for (long companyId : companyIds) {
962 loadIndexes(companyId);
963 }
964
965 loadIndexes(CompanyConstants.SYSTEM);
966 }
967
968 private void loadIndexes(long companyId) {
969 long lastGeneration = getLastGeneration(companyId);
970
971 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
972 return;
973 }
974
975 try {
976 LuceneClusterUtil.loadIndexesFromCluster(companyId);
977 }
978 catch (Exception e) {
979 _log.error(
980 "Unable to load indexes for company " + companyId, e);
981 }
982 }
983
984 }
985
986 private class LoadIndexClusterResponseCallback
987 extends BaseClusterResponseCallback {
988
989 public LoadIndexClusterResponseCallback(
990 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
991 long localLastGeneration) {
992
993 _indexAccessor = indexAccessor;
994 _clusterNodeAddressesCount = clusterNodeAddressesCount;
995 _localLastGeneration = localLastGeneration;
996
997 _companyId = _indexAccessor.getCompanyId();
998 }
999
1000 @Override
1001 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1002 Address bootupAddress = null;
1003
1004 do {
1005 _clusterNodeAddressesCount--;
1006
1007 ClusterNodeResponse clusterNodeResponse = null;
1008
1009 try {
1010 clusterNodeResponse = blockingQueue.poll(
1011 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1012 TimeUnit.MILLISECONDS);
1013 }
1014 catch (Exception e) {
1015 _log.error("Unable to get cluster node response", e);
1016 }
1017
1018 if (clusterNodeResponse == null) {
1019 if (_log.isDebugEnabled()) {
1020 _log.debug(
1021 "Unable to get cluster node response in " +
1022 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1023 TimeUnit.MILLISECONDS);
1024 }
1025
1026 continue;
1027 }
1028
1029 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1030
1031 if (clusterNode.getPort() > 0) {
1032 try {
1033 long remoteLastGeneration =
1034 (Long)clusterNodeResponse.getResult();
1035
1036 if (remoteLastGeneration > _localLastGeneration) {
1037 bootupAddress = clusterNodeResponse.getAddress();
1038
1039 break;
1040 }
1041 }
1042 catch (Exception e) {
1043 if (_log.isDebugEnabled()) {
1044 _log.debug(
1045 "Suppress exception caused by remote method " +
1046 "invocation",
1047 e);
1048 }
1049
1050 continue;
1051 }
1052 }
1053 else {
1054 if (_log.isDebugEnabled()) {
1055 _log.debug(
1056 "Cluster node " + clusterNode +
1057 " has invalid port");
1058 }
1059 }
1060 }
1061 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1062
1063 if (bootupAddress == null) {
1064 return;
1065 }
1066
1067 if (_log.isInfoEnabled()) {
1068 _log.info(
1069 "Start loading lucene index files from cluster node " +
1070 bootupAddress);
1071 }
1072
1073 InputStream inputStream = null;
1074
1075 try {
1076 inputStream = getLoadIndexesInputStreamFromCluster(
1077 _companyId, bootupAddress);
1078
1079 _indexAccessor.loadIndex(inputStream);
1080
1081 if (_log.isInfoEnabled()) {
1082 _log.info("Lucene index files loaded successfully");
1083 }
1084 }
1085 catch (Exception e) {
1086 _log.error("Unable to load index for company " + _companyId, e);
1087 }
1088 finally {
1089 if (inputStream != null) {
1090 try {
1091 inputStream.close();
1092 }
1093 catch (IOException ioe) {
1094 _log.error(
1095 "Unable to close input stream for company " +
1096 _companyId,
1097 ioe);
1098 }
1099 }
1100 }
1101 }
1102
1103 @Override
1104 public void processTimeoutException(TimeoutException timeoutException) {
1105 _log.error(
1106 "Unable to load index for company " + _companyId,
1107 timeoutException);
1108 }
1109
1110 private int _clusterNodeAddressesCount;
1111 private long _companyId;
1112 private IndexAccessor _indexAccessor;
1113 private long _localLastGeneration;
1114
1115 }
1116
1117 }