001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterLink;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
030 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
031 import com.liferay.portal.kernel.exception.SystemException;
032 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
033 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
034 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
035 import com.liferay.portal.kernel.log.Log;
036 import com.liferay.portal.kernel.log.LogFactoryUtil;
037 import com.liferay.portal.kernel.messaging.Destination;
038 import com.liferay.portal.kernel.messaging.MessageBus;
039 import com.liferay.portal.kernel.messaging.MessageBusUtil;
040 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
041 import com.liferay.portal.kernel.search.BooleanClauseOccur;
042 import com.liferay.portal.kernel.search.Field;
043 import com.liferay.portal.kernel.search.SearchEngineUtil;
044 import com.liferay.portal.kernel.util.ArrayUtil;
045 import com.liferay.portal.kernel.util.GetterUtil;
046 import com.liferay.portal.kernel.util.Http;
047 import com.liferay.portal.kernel.util.MethodHandler;
048 import com.liferay.portal.kernel.util.MethodKey;
049 import com.liferay.portal.kernel.util.ObjectValuePair;
050 import com.liferay.portal.kernel.util.PropsKeys;
051 import com.liferay.portal.kernel.util.StringBundler;
052 import com.liferay.portal.kernel.util.StringPool;
053 import com.liferay.portal.kernel.util.StringUtil;
054 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
055 import com.liferay.portal.kernel.util.Validator;
056 import com.liferay.portal.model.CompanyConstants;
057 import com.liferay.portal.search.SearchEngineInitializer;
058 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
059 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
060 import com.liferay.portal.security.auth.TransientTokenUtil;
061 import com.liferay.portal.util.PortalInstances;
062 import com.liferay.portal.util.PortalUtil;
063 import com.liferay.portal.util.PropsUtil;
064 import com.liferay.portal.util.PropsValues;
065
066 import java.io.IOException;
067 import java.io.InputStream;
068 import java.io.OutputStream;
069
070 import java.net.InetAddress;
071 import java.net.InetSocketAddress;
072 import java.net.URL;
073 import java.net.URLConnection;
074
075 import java.util.HashSet;
076 import java.util.List;
077 import java.util.Map;
078 import java.util.Set;
079 import java.util.concurrent.BlockingQueue;
080 import java.util.concurrent.ConcurrentHashMap;
081 import java.util.concurrent.CountDownLatch;
082 import java.util.concurrent.Future;
083 import java.util.concurrent.TimeUnit;
084
085 import org.apache.commons.lang.time.StopWatch;
086 import org.apache.lucene.analysis.Analyzer;
087 import org.apache.lucene.analysis.TokenStream;
088 import org.apache.lucene.document.Document;
089 import org.apache.lucene.index.IndexReader;
090 import org.apache.lucene.index.Term;
091 import org.apache.lucene.queryParser.QueryParser;
092 import org.apache.lucene.search.BooleanClause;
093 import org.apache.lucene.search.BooleanQuery;
094 import org.apache.lucene.search.IndexSearcher;
095 import org.apache.lucene.search.NumericRangeQuery;
096 import org.apache.lucene.search.Query;
097 import org.apache.lucene.search.TermQuery;
098 import org.apache.lucene.search.TermRangeQuery;
099 import org.apache.lucene.search.WildcardQuery;
100 import org.apache.lucene.search.highlight.Formatter;
101 import org.apache.lucene.search.highlight.Highlighter;
102 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
103 import org.apache.lucene.search.highlight.QueryScorer;
104 import org.apache.lucene.search.highlight.SimpleFragmenter;
105 import org.apache.lucene.search.highlight.WeightedTerm;
106 import org.apache.lucene.util.Version;
107
108
117 public class LuceneHelperImpl implements LuceneHelper {
118
119 @Override
120 public void addDocument(long companyId, Document document)
121 throws IOException {
122
123 IndexAccessor indexAccessor = getIndexAccessor(companyId);
124
125 indexAccessor.addDocument(document);
126 }
127
128 @Override
129 public void addExactTerm(
130 BooleanQuery booleanQuery, String field, String value) {
131
132 addTerm(booleanQuery, field, value, false);
133 }
134
135 @Override
136 public void addNumericRangeTerm(
137 BooleanQuery booleanQuery, String field, Integer startValue,
138 Integer endValue) {
139
140 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
141 field, startValue, endValue, true, true);
142
143 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
144 }
145
146 @Override
147 public void addNumericRangeTerm(
148 BooleanQuery booleanQuery, String field, Long startValue,
149 Long endValue) {
150
151 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
152 field, startValue, endValue, true, true);
153
154 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
155 }
156
157
161 @Deprecated
162 @Override
163 public void addNumericRangeTerm(
164 BooleanQuery booleanQuery, String field, String startValue,
165 String endValue) {
166
167 addNumericRangeTerm(
168 booleanQuery, field, GetterUtil.getLong(startValue),
169 GetterUtil.getLong(endValue));
170 }
171
172 @Override
173 public void addRangeTerm(
174 BooleanQuery booleanQuery, String field, String startValue,
175 String endValue) {
176
177 boolean includesLower = true;
178
179 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
180 includesLower = false;
181 }
182
183 boolean includesUpper = true;
184
185 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
186 includesUpper = false;
187 }
188
189 TermRangeQuery termRangeQuery = new TermRangeQuery(
190 field, startValue, endValue, includesLower, includesUpper);
191
192 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
193 }
194
195 @Override
196 public void addRequiredTerm(
197 BooleanQuery booleanQuery, String field, String value, boolean like) {
198
199 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
200 }
201
202 @Override
203 public void addRequiredTerm(
204 BooleanQuery booleanQuery, String field, String[] values,
205 boolean like) {
206
207 if (values == null) {
208 return;
209 }
210
211 BooleanQuery query = new BooleanQuery();
212
213 for (String value : values) {
214 addTerm(query, field, value, like);
215 }
216
217 booleanQuery.add(query, BooleanClause.Occur.MUST);
218 }
219
220 @Override
221 public void addTerm(
222 BooleanQuery booleanQuery, String field, String value, boolean like) {
223
224 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
225 }
226
227 @Override
228 public void addTerm(
229 BooleanQuery booleanQuery, String field, String value, boolean like,
230 BooleanClauseOccur booleanClauseOccur) {
231
232 if (Validator.isNull(value)) {
233 return;
234 }
235
236 Analyzer analyzer = getAnalyzer();
237
238 if (analyzer instanceof PerFieldAnalyzer) {
239 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
240
241 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
242
243 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
244 like = true;
245 }
246 }
247
248 if (like) {
249 value = StringUtil.replace(
250 value, StringPool.PERCENT, StringPool.BLANK);
251 }
252
253 try {
254 QueryParser queryParser = new QueryParser(
255 getVersion(), field, analyzer);
256
257 Query query = queryParser.parse(value);
258
259 BooleanClause.Occur occur = null;
260
261 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
262 occur = BooleanClause.Occur.MUST;
263 }
264 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
265 occur = BooleanClause.Occur.MUST_NOT;
266 }
267 else {
268 occur = BooleanClause.Occur.SHOULD;
269 }
270
271 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
272 }
273 catch (Exception e) {
274 if (_log.isWarnEnabled()) {
275 _log.warn(e, e);
276 }
277 }
278 }
279
280 @Override
281 public void addTerm(
282 BooleanQuery booleanQuery, String field, String[] values,
283 boolean like) {
284
285 for (String value : values) {
286 addTerm(booleanQuery, field, value, like);
287 }
288 }
289
290
294 @Deprecated
295 @Override
296 public void cleanUp(IndexSearcher indexSearcher) {
297 if (indexSearcher == null) {
298 return;
299 }
300
301 try {
302 indexSearcher.close();
303
304 IndexReader indexReader = indexSearcher.getIndexReader();
305
306 if (indexReader != null) {
307 indexReader.close();
308 }
309 }
310 catch (IOException ioe) {
311 _log.error(ioe, ioe);
312 }
313 }
314
315 @Override
316 public int countScoredFieldNames(Query query, String[] filedNames) {
317 int count = 0;
318
319 for (String fieldName : filedNames) {
320 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
321 query, false, fieldName);
322
323 if ((weightedTerms.length > 0) &&
324 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
325
326 count++;
327 }
328 }
329
330 return count;
331 }
332
333 @Override
334 public void delete(long companyId) {
335 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
336
337 if (indexAccessor == null) {
338 return;
339 }
340
341 indexAccessor.delete();
342 }
343
344 @Override
345 public void deleteDocuments(long companyId, Term term) throws IOException {
346 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
347
348 if (indexAccessor == null) {
349 return;
350 }
351
352 indexAccessor.deleteDocuments(term);
353 }
354
355 @Override
356 public void dumpIndex(long companyId, OutputStream outputStream)
357 throws IOException {
358
359 long lastGeneration = getLastGeneration(companyId);
360
361 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
362 if (_log.isDebugEnabled()) {
363 _log.debug(
364 "Dump index from cluster is not enabled for " + companyId);
365 }
366
367 return;
368 }
369
370 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
371
372 if (indexAccessor == null) {
373 return;
374 }
375
376 indexAccessor.dumpIndex(outputStream);
377 }
378
379 @Override
380 public Analyzer getAnalyzer() {
381 return _analyzer;
382 }
383
384 @Override
385 public IndexAccessor getIndexAccessor(long companyId) {
386 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
387
388 if (indexAccessor != null) {
389 return indexAccessor;
390 }
391
392 synchronized (this) {
393 indexAccessor = _indexAccessors.get(companyId);
394
395 if (indexAccessor == null) {
396 indexAccessor = new IndexAccessorImpl(companyId);
397
398 if (isLoadIndexFromClusterEnabled()) {
399 indexAccessor = new SynchronizedIndexAccessorImpl(
400 indexAccessor);
401
402 boolean clusterForwardMessage = GetterUtil.getBoolean(
403 MessageValuesThreadLocal.getValue(
404 ClusterLink.CLUSTER_FORWARD_MESSAGE));
405
406 if (clusterForwardMessage) {
407 if (_log.isInfoEnabled()) {
408 _log.info(
409 "Skip Luncene index files cluster loading " +
410 "since this is a manual reindex request");
411 }
412 }
413 else {
414 try {
415 _loadIndexFromCluster(
416 indexAccessor,
417 indexAccessor.getLastGeneration());
418 }
419 catch (Exception e) {
420 _log.error(
421 "Unable to load index for company " +
422 indexAccessor.getCompanyId(),
423 e);
424 }
425 }
426 }
427
428 _indexAccessors.put(companyId, indexAccessor);
429 }
430 }
431
432 return indexAccessor;
433 }
434
435 @Override
436 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
437 IndexAccessor indexAccessor = getIndexAccessor(companyId);
438
439 return indexAccessor.acquireIndexSearcher();
440 }
441
442 @Override
443 public long getLastGeneration(long companyId) {
444 if (!isLoadIndexFromClusterEnabled()) {
445 return IndexAccessor.DEFAULT_LAST_GENERATION;
446 }
447
448 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
449
450 if (indexAccessor == null) {
451 return IndexAccessor.DEFAULT_LAST_GENERATION;
452 }
453
454 return indexAccessor.getLastGeneration();
455 }
456
457 @Override
458 public InputStream getLoadIndexesInputStreamFromCluster(
459 long companyId, Address bootupAddress) {
460
461 if (!isLoadIndexFromClusterEnabled()) {
462 return null;
463 }
464
465 InputStream inputStream = null;
466
467 try {
468 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
469 _getBootupClusterNodeObjectValuePair(bootupAddress);
470
471 URL url = bootupClusterNodeObjectValuePair.getValue();
472
473 URLConnection urlConnection = url.openConnection();
474
475 urlConnection.setDoOutput(true);
476
477 try (UnsyncPrintWriter unsyncPrintWriter =
478 UnsyncPrintWriterPool.borrow(
479 urlConnection.getOutputStream())) {
480
481 unsyncPrintWriter.write("transientToken=");
482 unsyncPrintWriter.write(
483 bootupClusterNodeObjectValuePair.getKey());
484 unsyncPrintWriter.write("&companyId=");
485 unsyncPrintWriter.write(String.valueOf(companyId));
486 }
487
488 inputStream = urlConnection.getInputStream();
489
490 return inputStream;
491 }
492 catch (IOException ioe) {
493 throw new SystemException(ioe);
494 }
495 }
496
497 @Override
498 public Set<String> getQueryTerms(Query query) {
499 String queryString = StringUtil.replace(
500 query.toString(), StringPool.STAR, StringPool.BLANK);
501
502 Query tempQuery = null;
503
504 try {
505 QueryParser queryParser = new QueryParser(
506 getVersion(), StringPool.BLANK, getAnalyzer());
507
508 tempQuery = queryParser.parse(queryString);
509 }
510 catch (Exception e) {
511 if (_log.isWarnEnabled()) {
512 _log.warn("Unable to parse " + queryString);
513 }
514
515 tempQuery = query;
516 }
517
518 WeightedTerm[] weightedTerms = null;
519
520 for (String fieldName : Field.KEYWORDS) {
521 weightedTerms = QueryTermExtractor.getTerms(
522 tempQuery, false, fieldName);
523
524 if (weightedTerms.length > 0) {
525 break;
526 }
527 }
528
529 Set<String> queryTerms = new HashSet<String>();
530
531 for (WeightedTerm weightedTerm : weightedTerms) {
532 queryTerms.add(weightedTerm.getTerm());
533 }
534
535 return queryTerms;
536 }
537
538
541 @Deprecated
542 @Override
543 public IndexSearcher getSearcher(long companyId, boolean readOnly)
544 throws IOException {
545
546 IndexAccessor indexAccessor = getIndexAccessor(companyId);
547
548 IndexReader indexReader = IndexReader.open(
549 indexAccessor.getLuceneDir(), readOnly);
550
551 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
552
553 indexSearcher.setDefaultFieldSortScoring(true, false);
554 indexSearcher.setSimilarity(new FieldWeightSimilarity());
555
556 return indexSearcher;
557 }
558
559 @Override
560 public String getSnippet(
561 Query query, String field, String s, int maxNumFragments,
562 int fragmentLength, String fragmentSuffix, Formatter formatter)
563 throws IOException {
564
565 QueryScorer queryScorer = new QueryScorer(query, field);
566
567 Highlighter highlighter = new Highlighter(formatter, queryScorer);
568
569 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
570
571 TokenStream tokenStream = getAnalyzer().tokenStream(
572 field, new UnsyncStringReader(s));
573
574 try {
575 String snippet = highlighter.getBestFragments(
576 tokenStream, s, maxNumFragments, fragmentSuffix);
577
578 if (Validator.isNotNull(snippet) &&
579 !StringUtil.endsWith(snippet, fragmentSuffix) &&
580 !s.equals(snippet)) {
581
582 snippet = snippet.concat(fragmentSuffix);
583 }
584
585 return snippet;
586 }
587 catch (InvalidTokenOffsetsException itoe) {
588 throw new IOException(itoe);
589 }
590 }
591
592 @Override
593 public Version getVersion() {
594 return _version;
595 }
596
597 @Override
598 public boolean isLoadIndexFromClusterEnabled() {
599 if (PropsValues.CLUSTER_LINK_ENABLED &&
600 PropsValues.LUCENE_REPLICATE_WRITE) {
601
602 return true;
603 }
604
605 if (_log.isDebugEnabled()) {
606 _log.debug("Load index from cluster is not enabled");
607 }
608
609 return false;
610 }
611
612 @Override
613 public void loadIndex(long companyId, InputStream inputStream)
614 throws IOException {
615
616 if (!isLoadIndexFromClusterEnabled()) {
617 return;
618 }
619
620 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
621
622 if (indexAccessor == null) {
623 if (_log.isInfoEnabled()) {
624 _log.info(
625 "Skip loading Lucene index files for company " + companyId +
626 " in favor of lazy loading");
627 }
628
629 return;
630 }
631
632 StopWatch stopWatch = new StopWatch();
633
634 stopWatch.start();
635
636 if (_log.isInfoEnabled()) {
637 _log.info(
638 "Start loading Lucene index files for company " + companyId);
639 }
640
641 indexAccessor.loadIndex(inputStream);
642
643 if (_log.isInfoEnabled()) {
644 _log.info(
645 "Finished loading index files for company " + companyId +
646 " in " + stopWatch.getTime() + " ms");
647 }
648 }
649
650 @Override
651 public void loadIndexesFromCluster(long companyId) {
652 if (!isLoadIndexFromClusterEnabled()) {
653 return;
654 }
655
656 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
657
658 if (indexAccessor == null) {
659 return;
660 }
661
662 long localLastGeneration = getLastGeneration(companyId);
663
664 _loadIndexFromCluster(indexAccessor, localLastGeneration);
665 }
666
667 @Override
668 public void releaseIndexSearcher(
669 long companyId, IndexSearcher indexSearcher)
670 throws IOException {
671
672 IndexAccessor indexAccessor = getIndexAccessor(companyId);
673
674 indexAccessor.releaseIndexSearcher(indexSearcher);
675 }
676
677 public void setAnalyzer(Analyzer analyzer) {
678 _analyzer = analyzer;
679 }
680
681 public void setVersion(Version version) {
682 _version = version;
683 }
684
685 @Override
686 public void shutdown() {
687 if (_luceneIndexThreadPoolExecutor != null) {
688 _luceneIndexThreadPoolExecutor.shutdownNow();
689
690 try {
691 _luceneIndexThreadPoolExecutor.awaitTermination(
692 60, TimeUnit.SECONDS);
693 }
694 catch (InterruptedException ie) {
695 _log.error("Lucene indexer shutdown interrupted", ie);
696 }
697 }
698
699 if (isLoadIndexFromClusterEnabled()) {
700 ClusterExecutorUtil.removeClusterEventListener(
701 _loadIndexClusterEventListener);
702 }
703
704 MessageBus messageBus = MessageBusUtil.getMessageBus();
705
706 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
707 String searchWriterDestinationName =
708 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
709
710 Destination searchWriteDestination = messageBus.getDestination(
711 searchWriterDestinationName);
712
713 if (searchWriteDestination != null) {
714 ThreadPoolExecutor threadPoolExecutor =
715 PortalExecutorManagerUtil.getPortalExecutor(
716 searchWriterDestinationName);
717
718 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
719
720 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
721
722 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
723 countDownLatch);
724
725 for (int i = 0; i < maxPoolSize; i++) {
726 threadPoolExecutor.submit(shutdownSyncJob);
727 }
728
729 try {
730 countDownLatch.await();
731 }
732 catch (InterruptedException ie) {
733 _log.error("Shutdown waiting interrupted", ie);
734 }
735
736 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
737
738 if (_log.isDebugEnabled()) {
739 _log.debug(
740 "Cancelled appending indexing jobs: " + runnables);
741 }
742
743 searchWriteDestination.close(true);
744 }
745 }
746
747 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
748 indexAccessor.close();
749 }
750 }
751
752 @Override
753 public void shutdown(long companyId) {
754 IndexAccessor indexAccessor = getIndexAccessor(companyId);
755
756 _indexAccessors.remove(companyId);
757
758 indexAccessor.close();
759 }
760
761 @Override
762 public void startup(long companyId) {
763 if (!PropsValues.INDEX_ON_STARTUP) {
764 return;
765 }
766
767 if (_log.isInfoEnabled()) {
768 _log.info("Indexing Lucene on startup");
769 }
770
771 SearchEngineInitializer searchEngineInitializer =
772 new SearchEngineInitializer(companyId);
773
774 if (PropsValues.INDEX_WITH_THREAD) {
775 if (_luceneIndexThreadPoolExecutor == null) {
776
777
778
779
780
781 _luceneIndexThreadPoolExecutor =
782 PortalExecutorManagerUtil.getPortalExecutor(
783 LuceneHelperImpl.class.getName());
784 }
785
786 _luceneIndexThreadPoolExecutor.execute(searchEngineInitializer);
787 }
788 else {
789 searchEngineInitializer.reindex();
790 }
791 }
792
793 @Override
794 public void updateDocument(long companyId, Term term, Document document)
795 throws IOException {
796
797 IndexAccessor indexAccessor = getIndexAccessor(companyId);
798
799 indexAccessor.updateDocument(term, document);
800 }
801
802 private LuceneHelperImpl() {
803 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
804 _luceneIndexThreadPoolExecutor =
805 PortalExecutorManagerUtil.getPortalExecutor(
806 LuceneHelperImpl.class.getName());
807 }
808
809 if (isLoadIndexFromClusterEnabled()) {
810 _loadIndexClusterEventListener =
811 new LoadIndexClusterEventListener();
812
813 ClusterExecutorUtil.addClusterEventListener(
814 _loadIndexClusterEventListener);
815 }
816
817 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
818
819 if (StringUtil.equalsIgnoreCase(
820 Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL)) {
821
822 _protocol = Http.HTTPS;
823 }
824 else {
825 _protocol = Http.HTTP;
826 }
827 }
828
829 private ObjectValuePair<String, URL>
830 _getBootupClusterNodeObjectValuePair(Address bootupAddress) {
831
832 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
833 new MethodHandler(
834 _createTokenMethodKey,
835 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
836 bootupAddress);
837
838 FutureClusterResponses futureClusterResponses =
839 ClusterExecutorUtil.execute(clusterRequest);
840
841 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
842 futureClusterResponses.getPartialResults();
843
844 try {
845 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
846 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
847 TimeUnit.MILLISECONDS);
848
849 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
850
851 InetSocketAddress inetSocketAddress =
852 clusterNode.getPortalInetSocketAddress();
853
854 if (inetSocketAddress == null) {
855 StringBundler sb = new StringBundler(6);
856
857 sb.append("Invalid cluster node InetSocketAddress ");
858 sb.append(". The InetSocketAddress is set by the first ");
859 sb.append("request or configured in portal.properties by the");
860 sb.append("properties ");
861 sb.append("\"portal.instance.http.inet.socket.address\" and ");
862 sb.append("\"portal.instance.https.inet.socket.address\".");
863
864 throw new Exception(sb.toString());
865 }
866
867 InetAddress inetAddress = inetSocketAddress.getAddress();
868
869 String fileName = PortalUtil.getPathContext();
870
871 if (!fileName.endsWith(StringPool.SLASH)) {
872 fileName = fileName.concat(StringPool.SLASH);
873 }
874
875 fileName = fileName.concat("lucene/dump");
876
877 URL url = new URL(
878 _protocol, inetAddress.getHostAddress(),
879 inetSocketAddress.getPort(), fileName);
880
881 String transientToken = (String)clusterNodeResponse.getResult();
882
883 return new ObjectValuePair<String, URL>(transientToken, url);
884 }
885 catch (Exception e) {
886 throw new SystemException(e);
887 }
888 }
889
890 private void _includeIfUnique(
891 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
892 Query query, BooleanClause.Occur occur) {
893
894 if (query instanceof TermQuery) {
895 Set<Term> terms = new HashSet<Term>();
896
897 TermQuery termQuery = (TermQuery)query;
898
899 termQuery.extractTerms(terms);
900
901 float boost = termQuery.getBoost();
902
903 for (Term term : terms) {
904 String termValue = term.text();
905
906 if (like) {
907 termValue = termValue.toLowerCase(queryParser.getLocale());
908
909 term = term.createTerm(
910 StringPool.STAR.concat(termValue).concat(
911 StringPool.STAR));
912
913 query = new WildcardQuery(term);
914 }
915 else {
916 query = new TermQuery(term);
917 }
918
919 query.setBoost(boost);
920
921 boolean included = false;
922
923 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
924 if (query.equals(booleanClause.getQuery())) {
925 included = true;
926 }
927 }
928
929 if (!included) {
930 booleanQuery.add(query, occur);
931 }
932 }
933 }
934 else if (query instanceof BooleanQuery) {
935 BooleanQuery curBooleanQuery = (BooleanQuery)query;
936
937 BooleanQuery containerBooleanQuery = new BooleanQuery();
938
939 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
940 _includeIfUnique(
941 containerBooleanQuery, like, queryParser,
942 booleanClause.getQuery(), booleanClause.getOccur());
943 }
944
945 if (containerBooleanQuery.getClauses().length > 0) {
946 booleanQuery.add(containerBooleanQuery, occur);
947 }
948 }
949 else {
950 boolean included = false;
951
952 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
953 if (query.equals(booleanClause.getQuery())) {
954 included = true;
955 }
956 }
957
958 if (!included) {
959 booleanQuery.add(query, occur);
960 }
961 }
962 }
963
964 private void _loadIndexFromCluster(
965 final IndexAccessor indexAccessor, long localLastGeneration) {
966
967 List<Address> clusterNodeAddresses =
968 ClusterExecutorUtil.getClusterNodeAddresses();
969
970 int clusterNodeAddressesCount = clusterNodeAddresses.size();
971
972 if (clusterNodeAddressesCount <= 1) {
973 if (_log.isDebugEnabled()) {
974 _log.debug(
975 "Do not load indexes because there is either one portal " +
976 "instance or no portal instances in the cluster");
977 }
978
979 return;
980 }
981
982 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
983 new MethodHandler(
984 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
985 true);
986
987 FutureClusterResponses futureClusterResponses =
988 ClusterExecutorUtil.execute(
989 clusterRequest,
990 new LoadIndexClusterResponseCallback(
991 indexAccessor, clusterNodeAddressesCount,
992 localLastGeneration));
993
994 futureClusterResponses.addFutureListener(
995 new BaseFutureListener<ClusterNodeResponses>() {
996
997 @Override
998 public void completeWithException(
999 Future<ClusterNodeResponses> future, Throwable throwable) {
1000
1001 _log.error(
1002 "Unable to load index for company " +
1003 indexAccessor.getCompanyId(),
1004 throwable);
1005 }
1006
1007 });
1008 }
1009
1010 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1011 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1012
1013 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1014 GetterUtil.getInteger(
1015 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1016 BooleanQuery.getMaxClauseCount());
1017
1018 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1019
1020 private static MethodKey _createTokenMethodKey = new MethodKey(
1021 TransientTokenUtil.class, "createToken", long.class);
1022 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1023 LuceneHelperUtil.class, "getLastGeneration", long.class);
1024
1025 private Analyzer _analyzer;
1026 private Map<Long, IndexAccessor> _indexAccessors =
1027 new ConcurrentHashMap<Long, IndexAccessor>();
1028 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1029 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1030 private String _protocol;
1031 private Version _version;
1032
1033 private static class ShutdownSyncJob implements Runnable {
1034
1035 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1036 _countDownLatch = countDownLatch;
1037 }
1038
1039 @Override
1040 public void run() {
1041 _countDownLatch.countDown();
1042
1043 try {
1044 synchronized (this) {
1045 wait();
1046 }
1047 }
1048 catch (InterruptedException ie) {
1049 }
1050 }
1051
1052 private final CountDownLatch _countDownLatch;
1053
1054 }
1055
1056 private class LoadIndexClusterEventListener
1057 implements ClusterEventListener {
1058
1059 @Override
1060 public void processClusterEvent(ClusterEvent clusterEvent) {
1061 ClusterEventType clusterEventType =
1062 clusterEvent.getClusterEventType();
1063
1064 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1065 return;
1066 }
1067
1068 List<Address> clusterNodeAddresses =
1069 ClusterExecutorUtil.getClusterNodeAddresses();
1070 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1071
1072 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1073 if (_log.isDebugEnabled()) {
1074 _log.debug(
1075 "Number of original cluster members is greater than " +
1076 "one");
1077 }
1078
1079 return;
1080 }
1081
1082 long[] companyIds = PortalInstances.getCompanyIds();
1083
1084 for (long companyId : companyIds) {
1085 loadIndexes(companyId);
1086 }
1087
1088 loadIndexes(CompanyConstants.SYSTEM);
1089 }
1090
1091 private void loadIndexes(long companyId) {
1092 long lastGeneration = getLastGeneration(companyId);
1093
1094 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1095 return;
1096 }
1097
1098 try {
1099 LuceneClusterUtil.loadIndexesFromCluster(companyId);
1100 }
1101 catch (Exception e) {
1102 _log.error(
1103 "Unable to load indexes for company " + companyId, e);
1104 }
1105 }
1106
1107 }
1108
1109 private class LoadIndexClusterResponseCallback
1110 implements ClusterResponseCallback {
1111
1112 public LoadIndexClusterResponseCallback(
1113 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1114 long localLastGeneration) {
1115
1116 _indexAccessor = indexAccessor;
1117 _clusterNodeAddressesCount = clusterNodeAddressesCount;
1118 _localLastGeneration = localLastGeneration;
1119
1120 _companyId = _indexAccessor.getCompanyId();
1121 }
1122
1123 @Override
1124 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1125 Address bootupAddress = null;
1126
1127 do {
1128 _clusterNodeAddressesCount--;
1129
1130 ClusterNodeResponse clusterNodeResponse = null;
1131
1132 try {
1133 clusterNodeResponse = blockingQueue.poll(
1134 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1135 TimeUnit.MILLISECONDS);
1136 }
1137 catch (Exception e) {
1138 _log.error("Unable to get cluster node response", e);
1139 }
1140
1141 if (clusterNodeResponse == null) {
1142 if (_log.isDebugEnabled()) {
1143 _log.debug(
1144 "Unable to get cluster node response in " +
1145 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1146 TimeUnit.MILLISECONDS);
1147 }
1148
1149 continue;
1150 }
1151
1152 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1153
1154 if (clusterNode.getPortalInetSocketAddress() != null) {
1155 try {
1156 long remoteLastGeneration =
1157 (Long)clusterNodeResponse.getResult();
1158
1159 if (remoteLastGeneration > _localLastGeneration) {
1160 bootupAddress = clusterNodeResponse.getAddress();
1161
1162 break;
1163 }
1164 }
1165 catch (Exception e) {
1166 if (_log.isDebugEnabled()) {
1167 _log.debug(
1168 "Suppress exception caused by remote method " +
1169 "invocation",
1170 e);
1171 }
1172
1173 continue;
1174 }
1175 }
1176 else if (_log.isDebugEnabled()) {
1177 _log.debug(
1178 "Cluster node " + clusterNode +
1179 " has invalid InetSocketAddress");
1180 }
1181 }
1182 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1183
1184 if (bootupAddress == null) {
1185 return;
1186 }
1187
1188 if (_log.isInfoEnabled()) {
1189 _log.info(
1190 "Start loading lucene index files from cluster node " +
1191 bootupAddress);
1192 }
1193
1194 InputStream inputStream = null;
1195
1196 try {
1197 inputStream = getLoadIndexesInputStreamFromCluster(
1198 _companyId, bootupAddress);
1199
1200 _indexAccessor.loadIndex(inputStream);
1201
1202 if (_log.isInfoEnabled()) {
1203 _log.info("Lucene index files loaded successfully");
1204 }
1205 }
1206 catch (Exception e) {
1207 _log.error("Unable to load index for company " + _companyId, e);
1208 }
1209 finally {
1210 if (inputStream != null) {
1211 try {
1212 inputStream.close();
1213 }
1214 catch (IOException ioe) {
1215 _log.error(
1216 "Unable to close input stream for company " +
1217 _companyId,
1218 ioe);
1219 }
1220 }
1221 }
1222 }
1223
1224 private int _clusterNodeAddressesCount;
1225 private long _companyId;
1226 private IndexAccessor _indexAccessor;
1227 private long _localLastGeneration;
1228
1229 }
1230
1231 }