001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterLink;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028 import com.liferay.portal.kernel.exception.SystemException;
029 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032 import com.liferay.portal.kernel.log.Log;
033 import com.liferay.portal.kernel.log.LogFactoryUtil;
034 import com.liferay.portal.kernel.messaging.Destination;
035 import com.liferay.portal.kernel.messaging.MessageBus;
036 import com.liferay.portal.kernel.messaging.MessageBusUtil;
037 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038 import com.liferay.portal.kernel.search.BooleanClauseOccur;
039 import com.liferay.portal.kernel.search.Field;
040 import com.liferay.portal.kernel.search.SearchEngineUtil;
041 import com.liferay.portal.kernel.util.ArrayUtil;
042 import com.liferay.portal.kernel.util.GetterUtil;
043 import com.liferay.portal.kernel.util.MethodHandler;
044 import com.liferay.portal.kernel.util.MethodKey;
045 import com.liferay.portal.kernel.util.ObjectValuePair;
046 import com.liferay.portal.kernel.util.PropsKeys;
047 import com.liferay.portal.kernel.util.StringBundler;
048 import com.liferay.portal.kernel.util.StringPool;
049 import com.liferay.portal.kernel.util.StringUtil;
050 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
051 import com.liferay.portal.kernel.util.Validator;
052 import com.liferay.portal.model.CompanyConstants;
053 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
054 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
055 import com.liferay.portal.security.auth.TransientTokenUtil;
056 import com.liferay.portal.util.PortalInstances;
057 import com.liferay.portal.util.PortalUtil;
058 import com.liferay.portal.util.PropsUtil;
059 import com.liferay.portal.util.PropsValues;
060
061 import java.io.IOException;
062 import java.io.InputStream;
063 import java.io.OutputStream;
064
065 import java.net.InetAddress;
066 import java.net.URL;
067 import java.net.URLConnection;
068
069 import java.util.HashSet;
070 import java.util.List;
071 import java.util.Map;
072 import java.util.Set;
073 import java.util.concurrent.BlockingQueue;
074 import java.util.concurrent.ConcurrentHashMap;
075 import java.util.concurrent.CountDownLatch;
076 import java.util.concurrent.TimeUnit;
077
078 import org.apache.commons.lang.time.StopWatch;
079 import org.apache.lucene.analysis.Analyzer;
080 import org.apache.lucene.analysis.TokenStream;
081 import org.apache.lucene.document.Document;
082 import org.apache.lucene.index.IndexReader;
083 import org.apache.lucene.index.Term;
084 import org.apache.lucene.queryParser.QueryParser;
085 import org.apache.lucene.search.BooleanClause;
086 import org.apache.lucene.search.BooleanQuery;
087 import org.apache.lucene.search.IndexSearcher;
088 import org.apache.lucene.search.NumericRangeQuery;
089 import org.apache.lucene.search.Query;
090 import org.apache.lucene.search.TermQuery;
091 import org.apache.lucene.search.TermRangeQuery;
092 import org.apache.lucene.search.WildcardQuery;
093 import org.apache.lucene.search.highlight.Formatter;
094 import org.apache.lucene.search.highlight.Highlighter;
095 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
096 import org.apache.lucene.search.highlight.QueryScorer;
097 import org.apache.lucene.search.highlight.SimpleFragmenter;
098 import org.apache.lucene.search.highlight.WeightedTerm;
099 import org.apache.lucene.util.Version;
100
101
110 public class LuceneHelperImpl implements LuceneHelper {
111
112 @Override
113 public void addDocument(long companyId, Document document)
114 throws IOException {
115
116 IndexAccessor indexAccessor = getIndexAccessor(companyId);
117
118 indexAccessor.addDocument(document);
119 }
120
121 @Override
122 public void addExactTerm(
123 BooleanQuery booleanQuery, String field, String value) {
124
125 addTerm(booleanQuery, field, value, false);
126 }
127
128 @Override
129 public void addNumericRangeTerm(
130 BooleanQuery booleanQuery, String field, Integer startValue,
131 Integer endValue) {
132
133 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
134 field, startValue, endValue, true, true);
135
136 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
137 }
138
139 @Override
140 public void addNumericRangeTerm(
141 BooleanQuery booleanQuery, String field, Long startValue,
142 Long endValue) {
143
144 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
145 field, startValue, endValue, true, true);
146
147 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
148 }
149
150
154 @Override
155 public void addNumericRangeTerm(
156 BooleanQuery booleanQuery, String field, String startValue,
157 String endValue) {
158
159 addNumericRangeTerm(
160 booleanQuery, field, GetterUtil.getLong(startValue),
161 GetterUtil.getLong(endValue));
162 }
163
164 @Override
165 public void addRangeTerm(
166 BooleanQuery booleanQuery, String field, String startValue,
167 String endValue) {
168
169 boolean includesLower = true;
170
171 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
172 includesLower = false;
173 }
174
175 boolean includesUpper = true;
176
177 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
178 includesUpper = false;
179 }
180
181 TermRangeQuery termRangeQuery = new TermRangeQuery(
182 field, startValue, endValue, includesLower, includesUpper);
183
184 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
185 }
186
187 @Override
188 public void addRequiredTerm(
189 BooleanQuery booleanQuery, String field, String value, boolean like) {
190
191 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
192 }
193
194 @Override
195 public void addRequiredTerm(
196 BooleanQuery booleanQuery, String field, String[] values,
197 boolean like) {
198
199 if (values == null) {
200 return;
201 }
202
203 BooleanQuery query = new BooleanQuery();
204
205 for (String value : values) {
206 addTerm(query, field, value, like);
207 }
208
209 booleanQuery.add(query, BooleanClause.Occur.MUST);
210 }
211
212 @Override
213 public void addTerm(
214 BooleanQuery booleanQuery, String field, String value, boolean like) {
215
216 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
217 }
218
219 @Override
220 public void addTerm(
221 BooleanQuery booleanQuery, String field, String value, boolean like,
222 BooleanClauseOccur booleanClauseOccur) {
223
224 if (Validator.isNull(value)) {
225 return;
226 }
227
228 Analyzer analyzer = getAnalyzer();
229
230 if (analyzer instanceof PerFieldAnalyzer) {
231 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
232
233 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
234
235 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
236 like = true;
237 }
238 }
239
240 if (like) {
241 value = StringUtil.replace(
242 value, StringPool.PERCENT, StringPool.BLANK);
243 }
244
245 try {
246 QueryParser queryParser = new QueryParser(
247 getVersion(), field, analyzer);
248
249 Query query = queryParser.parse(value);
250
251 BooleanClause.Occur occur = null;
252
253 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
254 occur = BooleanClause.Occur.MUST;
255 }
256 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
257 occur = BooleanClause.Occur.MUST_NOT;
258 }
259 else {
260 occur = BooleanClause.Occur.SHOULD;
261 }
262
263 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
264 }
265 catch (Exception e) {
266 if (_log.isWarnEnabled()) {
267 _log.warn(e, e);
268 }
269 }
270 }
271
272 @Override
273 public void addTerm(
274 BooleanQuery booleanQuery, String field, String[] values,
275 boolean like) {
276
277 for (String value : values) {
278 addTerm(booleanQuery, field, value, like);
279 }
280 }
281
282
286 @Deprecated
287 @Override
288 public void cleanUp(IndexSearcher indexSearcher) {
289 if (indexSearcher == null) {
290 return;
291 }
292
293 try {
294 indexSearcher.close();
295
296 IndexReader indexReader = indexSearcher.getIndexReader();
297
298 if (indexReader != null) {
299 indexReader.close();
300 }
301 }
302 catch (IOException ioe) {
303 _log.error(ioe, ioe);
304 }
305 }
306
307 @Override
308 public int countScoredFieldNames(Query query, String[] filedNames) {
309 int count = 0;
310
311 for (String fieldName : filedNames) {
312 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
313 query, false, fieldName);
314
315 if ((weightedTerms.length > 0) &&
316 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
317
318 count++;
319 }
320 }
321
322 return count;
323 }
324
325 @Override
326 public void delete(long companyId) {
327 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
328
329 if (indexAccessor == null) {
330 return;
331 }
332
333 indexAccessor.delete();
334 }
335
336 @Override
337 public void deleteDocuments(long companyId, Term term) throws IOException {
338 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
339
340 if (indexAccessor == null) {
341 return;
342 }
343
344 indexAccessor.deleteDocuments(term);
345 }
346
347 @Override
348 public void dumpIndex(long companyId, OutputStream outputStream)
349 throws IOException {
350
351 long lastGeneration = getLastGeneration(companyId);
352
353 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
354 if (_log.isDebugEnabled()) {
355 _log.debug(
356 "Dump index from cluster is not enabled for " + companyId);
357 }
358
359 return;
360 }
361
362 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
363
364 if (indexAccessor == null) {
365 return;
366 }
367
368 indexAccessor.dumpIndex(outputStream);
369 }
370
371 @Override
372 public Analyzer getAnalyzer() {
373 return _analyzer;
374 }
375
376 @Override
377 public IndexAccessor getIndexAccessor(long companyId) {
378 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
379
380 if (indexAccessor != null) {
381 return indexAccessor;
382 }
383
384 synchronized (this) {
385 indexAccessor = _indexAccessors.get(companyId);
386
387 if (indexAccessor == null) {
388 indexAccessor = new IndexAccessorImpl(companyId);
389
390 if (isLoadIndexFromClusterEnabled()) {
391 indexAccessor = new SynchronizedIndexAccessorImpl(
392 indexAccessor);
393
394 boolean clusterForwardMessage = GetterUtil.getBoolean(
395 MessageValuesThreadLocal.getValue(
396 ClusterLink.CLUSTER_FORWARD_MESSAGE));
397
398 if (clusterForwardMessage) {
399 if (_log.isInfoEnabled()) {
400 _log.info(
401 "Skip Luncene index files cluster loading " +
402 "since this is a manual reindex request");
403 }
404 }
405 else {
406 try {
407 _loadIndexFromCluster(
408 indexAccessor,
409 indexAccessor.getLastGeneration());
410 }
411 catch (Exception e) {
412 _log.error(
413 "Unable to load index for company " +
414 indexAccessor.getCompanyId(),
415 e);
416 }
417 }
418 }
419
420 _indexAccessors.put(companyId, indexAccessor);
421 }
422 }
423
424 return indexAccessor;
425 }
426
427 @Override
428 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
429 IndexAccessor indexAccessor = getIndexAccessor(companyId);
430
431 return indexAccessor.acquireIndexSearcher();
432 }
433
434 @Override
435 public long getLastGeneration(long companyId) {
436 if (!isLoadIndexFromClusterEnabled()) {
437 return IndexAccessor.DEFAULT_LAST_GENERATION;
438 }
439
440 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
441
442 if (indexAccessor == null) {
443 return IndexAccessor.DEFAULT_LAST_GENERATION;
444 }
445
446 return indexAccessor.getLastGeneration();
447 }
448
449 @Override
450 public InputStream getLoadIndexesInputStreamFromCluster(
451 long companyId, Address bootupAddress)
452 throws SystemException {
453
454 if (!isLoadIndexFromClusterEnabled()) {
455 return null;
456 }
457
458 InputStream inputStream = null;
459
460 try {
461 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
462 _getBootupClusterNodeObjectValuePair(bootupAddress);
463
464 URL url = bootupClusterNodeObjectValuePair.getValue();
465
466 URLConnection urlConnection = url.openConnection();
467
468 urlConnection.setDoOutput(true);
469
470 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
471 urlConnection.getOutputStream());
472
473 unsyncPrintWriter.write("transientToken=");
474 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
475 unsyncPrintWriter.write("&companyId=");
476 unsyncPrintWriter.write(String.valueOf(companyId));
477
478 unsyncPrintWriter.close();
479
480 inputStream = urlConnection.getInputStream();
481
482 return inputStream;
483 }
484 catch (IOException ioe) {
485 throw new SystemException(ioe);
486 }
487 }
488
489 @Override
490 public Set<String> getQueryTerms(Query query) {
491 String queryString = StringUtil.replace(
492 query.toString(), StringPool.STAR, StringPool.BLANK);
493
494 Query tempQuery = null;
495
496 try {
497 QueryParser queryParser = new QueryParser(
498 getVersion(), StringPool.BLANK, getAnalyzer());
499
500 tempQuery = queryParser.parse(queryString);
501 }
502 catch (Exception e) {
503 if (_log.isWarnEnabled()) {
504 _log.warn("Unable to parse " + queryString);
505 }
506
507 tempQuery = query;
508 }
509
510 WeightedTerm[] weightedTerms = null;
511
512 for (String fieldName : Field.KEYWORDS) {
513 weightedTerms = QueryTermExtractor.getTerms(
514 tempQuery, false, fieldName);
515
516 if (weightedTerms.length > 0) {
517 break;
518 }
519 }
520
521 Set<String> queryTerms = new HashSet<String>();
522
523 for (WeightedTerm weightedTerm : weightedTerms) {
524 queryTerms.add(weightedTerm.getTerm());
525 }
526
527 return queryTerms;
528 }
529
530
533 @Deprecated
534 @Override
535 public IndexSearcher getSearcher(long companyId, boolean readOnly)
536 throws IOException {
537
538 IndexAccessor indexAccessor = getIndexAccessor(companyId);
539
540 IndexReader indexReader = IndexReader.open(
541 indexAccessor.getLuceneDir(), readOnly);
542
543 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
544
545 indexSearcher.setDefaultFieldSortScoring(true, false);
546 indexSearcher.setSimilarity(new FieldWeightSimilarity());
547
548 return indexSearcher;
549 }
550
551 @Override
552 public String getSnippet(
553 Query query, String field, String s, int maxNumFragments,
554 int fragmentLength, String fragmentSuffix, Formatter formatter)
555 throws IOException {
556
557 QueryScorer queryScorer = new QueryScorer(query, field);
558
559 Highlighter highlighter = new Highlighter(formatter, queryScorer);
560
561 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
562
563 TokenStream tokenStream = getAnalyzer().tokenStream(
564 field, new UnsyncStringReader(s));
565
566 try {
567 String snippet = highlighter.getBestFragments(
568 tokenStream, s, maxNumFragments, fragmentSuffix);
569
570 if (Validator.isNotNull(snippet) &&
571 !StringUtil.endsWith(snippet, fragmentSuffix) &&
572 !s.equals(snippet)) {
573
574 snippet = snippet.concat(fragmentSuffix);
575 }
576
577 return snippet;
578 }
579 catch (InvalidTokenOffsetsException itoe) {
580 throw new IOException(itoe.getMessage());
581 }
582 }
583
584 @Override
585 public Version getVersion() {
586 return _version;
587 }
588
589 @Override
590 public boolean isLoadIndexFromClusterEnabled() {
591 if (PropsValues.CLUSTER_LINK_ENABLED &&
592 PropsValues.LUCENE_REPLICATE_WRITE) {
593
594 return true;
595 }
596
597 if (_log.isDebugEnabled()) {
598 _log.debug("Load index from cluster is not enabled");
599 }
600
601 return false;
602 }
603
604 @Override
605 public void loadIndex(long companyId, InputStream inputStream)
606 throws IOException {
607
608 if (!isLoadIndexFromClusterEnabled()) {
609 return;
610 }
611
612 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
613
614 if (indexAccessor == null) {
615 if (_log.isInfoEnabled()) {
616 _log.info(
617 "Skip loading Lucene index files for company " + companyId +
618 " in favor of lazy loading");
619 }
620
621 return;
622 }
623
624 StopWatch stopWatch = new StopWatch();
625
626 stopWatch.start();
627
628 if (_log.isInfoEnabled()) {
629 _log.info(
630 "Start loading Lucene index files for company " + companyId);
631 }
632
633 indexAccessor.loadIndex(inputStream);
634
635 if (_log.isInfoEnabled()) {
636 _log.info(
637 "Finished loading index files for company " + companyId +
638 " in " + stopWatch.getTime() + " ms");
639 }
640 }
641
642 @Override
643 public void loadIndexesFromCluster(long companyId) throws SystemException {
644 if (!isLoadIndexFromClusterEnabled()) {
645 return;
646 }
647
648 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
649
650 if (indexAccessor == null) {
651 return;
652 }
653
654 long localLastGeneration = getLastGeneration(companyId);
655
656 _loadIndexFromCluster(indexAccessor, localLastGeneration);
657 }
658
659 @Override
660 public void releaseIndexSearcher(
661 long companyId, IndexSearcher indexSearcher)
662 throws IOException {
663
664 IndexAccessor indexAccessor = getIndexAccessor(companyId);
665
666 indexAccessor.releaseIndexSearcher(indexSearcher);
667 }
668
669 public void setAnalyzer(Analyzer analyzer) {
670 _analyzer = analyzer;
671 }
672
673 public void setVersion(Version version) {
674 _version = version;
675 }
676
677 @Override
678 public void shutdown() {
679 if (_luceneIndexThreadPoolExecutor != null) {
680 _luceneIndexThreadPoolExecutor.shutdownNow();
681
682 try {
683 _luceneIndexThreadPoolExecutor.awaitTermination(
684 60, TimeUnit.SECONDS);
685 }
686 catch (InterruptedException ie) {
687 _log.error("Lucene indexer shutdown interrupted", ie);
688 }
689 }
690
691 if (isLoadIndexFromClusterEnabled()) {
692 ClusterExecutorUtil.removeClusterEventListener(
693 _loadIndexClusterEventListener);
694 }
695
696 MessageBus messageBus = MessageBusUtil.getMessageBus();
697
698 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
699 String searchWriterDestinationName =
700 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
701
702 Destination searchWriteDestination = messageBus.getDestination(
703 searchWriterDestinationName);
704
705 if (searchWriteDestination != null) {
706 ThreadPoolExecutor threadPoolExecutor =
707 PortalExecutorManagerUtil.getPortalExecutor(
708 searchWriterDestinationName);
709
710 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
711
712 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
713
714 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
715 countDownLatch);
716
717 for (int i = 0; i < maxPoolSize; i++) {
718 threadPoolExecutor.submit(shutdownSyncJob);
719 }
720
721 try {
722 countDownLatch.await();
723 }
724 catch (InterruptedException ie) {
725 _log.error("Shutdown waiting interrupted", ie);
726 }
727
728 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
729
730 if (_log.isDebugEnabled()) {
731 _log.debug(
732 "Cancelled appending indexing jobs: " + runnables);
733 }
734
735 searchWriteDestination.close(true);
736 }
737 }
738
739 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
740 indexAccessor.close();
741 }
742 }
743
744 @Override
745 public void shutdown(long companyId) {
746 IndexAccessor indexAccessor = getIndexAccessor(companyId);
747
748 _indexAccessors.remove(indexAccessor);
749
750 indexAccessor.close();
751 }
752
753 @Override
754 public void startup(long companyId) {
755 if (!PropsValues.INDEX_ON_STARTUP) {
756 return;
757 }
758
759 if (_log.isInfoEnabled()) {
760 _log.info("Indexing Lucene on startup");
761 }
762
763 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
764
765 if (PropsValues.INDEX_WITH_THREAD) {
766 if (_luceneIndexThreadPoolExecutor == null) {
767
768
769
770
771
772 _luceneIndexThreadPoolExecutor =
773 PortalExecutorManagerUtil.getPortalExecutor(
774 LuceneHelperImpl.class.getName());
775 }
776
777 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
778 }
779 else {
780 luceneIndexer.reindex();
781 }
782 }
783
784 @Override
785 public void updateDocument(long companyId, Term term, Document document)
786 throws IOException {
787
788 IndexAccessor indexAccessor = getIndexAccessor(companyId);
789
790 indexAccessor.updateDocument(term, document);
791 }
792
793 private LuceneHelperImpl() {
794 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
795 _luceneIndexThreadPoolExecutor =
796 PortalExecutorManagerUtil.getPortalExecutor(
797 LuceneHelperImpl.class.getName());
798 }
799
800 if (isLoadIndexFromClusterEnabled()) {
801 _loadIndexClusterEventListener =
802 new LoadIndexClusterEventListener();
803
804 ClusterExecutorUtil.addClusterEventListener(
805 _loadIndexClusterEventListener);
806 }
807
808 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
809 }
810
811 private ObjectValuePair<String, URL>
812 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
813 throws SystemException {
814
815 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
816 new MethodHandler(
817 _createTokenMethodKey,
818 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
819 bootupAddress);
820
821 FutureClusterResponses futureClusterResponses =
822 ClusterExecutorUtil.execute(clusterRequest);
823
824 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
825 futureClusterResponses.getPartialResults();
826
827 try {
828 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
829 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
830 TimeUnit.MILLISECONDS);
831
832 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
833
834 int port = clusterNode.getPort();
835
836 if (port <= 0) {
837 StringBundler sb = new StringBundler(6);
838
839 sb.append("Invalid cluster node port ");
840 sb.append(port);
841 sb.append(". The port is set by the first request or ");
842 sb.append("configured in portal.properties by the properties ");
843 sb.append("\"portal.instance.http.port\" and ");
844 sb.append("\"portal.instance.https.port\".");
845
846 throw new Exception(sb.toString());
847 }
848
849 String protocol = clusterNode.getPortalProtocol();
850
851 if (Validator.isNull(protocol)) {
852 StringBundler sb = new StringBundler(4);
853
854 sb.append("Cluster node protocol is empty. The protocol is ");
855 sb.append("set by the first request or configured in ");
856 sb.append("portal.properties by the property ");
857 sb.append("\"portal.instance.protocol\"");
858
859 throw new Exception(sb.toString());
860 }
861
862 InetAddress inetAddress = clusterNode.getInetAddress();
863
864 String fileName = PortalUtil.getPathContext();
865
866 if (!fileName.endsWith(StringPool.SLASH)) {
867 fileName = fileName.concat(StringPool.SLASH);
868 }
869
870 fileName = fileName.concat("lucene/dump");
871
872 URL url = new URL(
873 protocol, inetAddress.getHostAddress(), port, fileName);
874
875 String transientToken = (String)clusterNodeResponse.getResult();
876
877 return new ObjectValuePair<String, URL>(transientToken, url);
878 }
879 catch (Exception e) {
880 throw new SystemException(e);
881 }
882 }
883
884 private void _handleFutureClusterResponses(
885 FutureClusterResponses futureClusterResponses,
886 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
887 long localLastGeneration) {
888
889 BlockingQueue<ClusterNodeResponse> blockingQueue =
890 futureClusterResponses.getPartialResults();
891
892 long companyId = indexAccessor.getCompanyId();
893
894 Address bootupAddress = null;
895
896 do {
897 clusterNodeAddressesCount--;
898
899 ClusterNodeResponse clusterNodeResponse = null;
900
901 try {
902 clusterNodeResponse = blockingQueue.poll(
903 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
904 TimeUnit.MILLISECONDS);
905 }
906 catch (Exception e) {
907 _log.error("Unable to get cluster node response", e);
908 }
909
910 if (clusterNodeResponse == null) {
911 if (_log.isDebugEnabled()) {
912 _log.debug(
913 "Unable to get cluster node response in " +
914 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
915 TimeUnit.MILLISECONDS);
916 }
917
918 continue;
919 }
920
921 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
922
923 if (clusterNode.getPort() > 0) {
924 try {
925 long remoteLastGeneration =
926 (Long)clusterNodeResponse.getResult();
927
928 if (remoteLastGeneration > localLastGeneration) {
929 bootupAddress = clusterNodeResponse.getAddress();
930
931 break;
932 }
933 }
934 catch (Exception e) {
935 if (_log.isDebugEnabled()) {
936 _log.debug(
937 "Suppress exception caused by remote method " +
938 "invocation",
939 e);
940 }
941
942 continue;
943 }
944 }
945 else {
946 if (_log.isDebugEnabled()) {
947 _log.debug(
948 "Cluster node " + clusterNode +
949 " has invalid port");
950 }
951 }
952 }
953 while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
954
955 if (bootupAddress == null) {
956 return;
957 }
958
959 if (_log.isInfoEnabled()) {
960 _log.info(
961 "Start loading lucene index files from cluster node " +
962 bootupAddress);
963 }
964
965 InputStream inputStream = null;
966
967 try {
968 inputStream = getLoadIndexesInputStreamFromCluster(
969 companyId, bootupAddress);
970
971 indexAccessor.loadIndex(inputStream);
972
973 if (_log.isInfoEnabled()) {
974 _log.info("Lucene index files loaded successfully");
975 }
976 }
977 catch (Exception e) {
978 _log.error("Unable to load index for company " + companyId, e);
979 }
980 finally {
981 if (inputStream != null) {
982 try {
983 inputStream.close();
984 }
985 catch (IOException ioe) {
986 _log.error(
987 "Unable to close input stream for company " +
988 companyId,
989 ioe);
990 }
991 }
992 }
993 }
994
995 private void _includeIfUnique(
996 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
997 Query query, BooleanClause.Occur occur) {
998
999 if (query instanceof TermQuery) {
1000 Set<Term> terms = new HashSet<Term>();
1001
1002 TermQuery termQuery = (TermQuery)query;
1003
1004 termQuery.extractTerms(terms);
1005
1006 float boost = termQuery.getBoost();
1007
1008 for (Term term : terms) {
1009 String termValue = term.text();
1010
1011 if (like) {
1012 termValue = termValue.toLowerCase(queryParser.getLocale());
1013
1014 term = term.createTerm(
1015 StringPool.STAR.concat(termValue).concat(
1016 StringPool.STAR));
1017
1018 query = new WildcardQuery(term);
1019 }
1020 else {
1021 query = new TermQuery(term);
1022 }
1023
1024 query.setBoost(boost);
1025
1026 boolean included = false;
1027
1028 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1029 if (query.equals(booleanClause.getQuery())) {
1030 included = true;
1031 }
1032 }
1033
1034 if (!included) {
1035 booleanQuery.add(query, occur);
1036 }
1037 }
1038 }
1039 else if (query instanceof BooleanQuery) {
1040 BooleanQuery curBooleanQuery = (BooleanQuery)query;
1041
1042 BooleanQuery containerBooleanQuery = new BooleanQuery();
1043
1044 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1045 _includeIfUnique(
1046 containerBooleanQuery, like, queryParser,
1047 booleanClause.getQuery(), booleanClause.getOccur());
1048 }
1049
1050 if (containerBooleanQuery.getClauses().length > 0) {
1051 booleanQuery.add(containerBooleanQuery, occur);
1052 }
1053 }
1054 else {
1055 boolean included = false;
1056
1057 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1058 if (query.equals(booleanClause.getQuery())) {
1059 included = true;
1060 }
1061 }
1062
1063 if (!included) {
1064 booleanQuery.add(query, occur);
1065 }
1066 }
1067 }
1068
1069 private void _loadIndexFromCluster(
1070 IndexAccessor indexAccessor, long localLastGeneration)
1071 throws SystemException {
1072
1073 List<Address> clusterNodeAddresses =
1074 ClusterExecutorUtil.getClusterNodeAddresses();
1075
1076 int clusterNodeAddressesCount = clusterNodeAddresses.size();
1077
1078 if (clusterNodeAddressesCount <= 1) {
1079 if (_log.isDebugEnabled()) {
1080 _log.debug(
1081 "Do not load indexes because there is either one portal " +
1082 "instance or no portal instances in the cluster");
1083 }
1084
1085 return;
1086 }
1087
1088 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1089 new MethodHandler(
1090 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1091 true);
1092
1093 FutureClusterResponses futureClusterResponses =
1094 ClusterExecutorUtil.execute(clusterRequest);
1095
1096 _handleFutureClusterResponses(
1097 futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1098 localLastGeneration);
1099 }
1100
1101 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1102 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1103
1104 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1105 GetterUtil.getInteger(
1106 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1107 BooleanQuery.getMaxClauseCount());
1108
1109 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1110
1111 private static MethodKey _createTokenMethodKey = new MethodKey(
1112 TransientTokenUtil.class, "createToken", long.class);
1113 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1114 LuceneHelperUtil.class, "getLastGeneration", long.class);
1115
1116 private Analyzer _analyzer;
1117 private Map<Long, IndexAccessor> _indexAccessors =
1118 new ConcurrentHashMap<Long, IndexAccessor>();
1119 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1120 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1121 private Version _version;
1122
1123 private static class ShutdownSyncJob implements Runnable {
1124
1125 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1126 _countDownLatch = countDownLatch;
1127 }
1128
1129 @Override
1130 public void run() {
1131 _countDownLatch.countDown();
1132
1133 try {
1134 synchronized (this) {
1135 wait();
1136 }
1137 }
1138 catch (InterruptedException ie) {
1139 }
1140 }
1141
1142 private final CountDownLatch _countDownLatch;
1143
1144 }
1145
1146 private class LoadIndexClusterEventListener
1147 implements ClusterEventListener {
1148
1149 @Override
1150 public void processClusterEvent(ClusterEvent clusterEvent) {
1151 ClusterEventType clusterEventType =
1152 clusterEvent.getClusterEventType();
1153
1154 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1155 return;
1156 }
1157
1158 List<Address> clusterNodeAddresses =
1159 ClusterExecutorUtil.getClusterNodeAddresses();
1160 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1161
1162 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1163 if (_log.isDebugEnabled()) {
1164 _log.debug(
1165 "Number of original cluster members is greater than " +
1166 "one");
1167 }
1168
1169 return;
1170 }
1171
1172 long[] companyIds = PortalInstances.getCompanyIds();
1173
1174 for (long companyId : companyIds) {
1175 loadIndexes(companyId);
1176 }
1177
1178 loadIndexes(CompanyConstants.SYSTEM);
1179 }
1180
1181 private void loadIndexes(long companyId) {
1182 long lastGeneration = getLastGeneration(companyId);
1183
1184 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1185 return;
1186 }
1187
1188 try {
1189 LuceneClusterUtil.loadIndexesFromCluster(companyId);
1190 }
1191 catch (Exception e) {
1192 _log.error(
1193 "Unable to load indexes for company " + companyId, e);
1194 }
1195 }
1196
1197 }
1198
1199 }