001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLink;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.Destination;
036 import com.liferay.portal.kernel.messaging.MessageBus;
037 import com.liferay.portal.kernel.messaging.MessageBusUtil;
038 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039 import com.liferay.portal.kernel.search.BooleanClauseOccur;
040 import com.liferay.portal.kernel.search.Field;
041 import com.liferay.portal.kernel.search.SearchEngineUtil;
042 import com.liferay.portal.kernel.util.ArrayUtil;
043 import com.liferay.portal.kernel.util.GetterUtil;
044 import com.liferay.portal.kernel.util.Http;
045 import com.liferay.portal.kernel.util.MethodHandler;
046 import com.liferay.portal.kernel.util.MethodKey;
047 import com.liferay.portal.kernel.util.ObjectValuePair;
048 import com.liferay.portal.kernel.util.PropsKeys;
049 import com.liferay.portal.kernel.util.StringBundler;
050 import com.liferay.portal.kernel.util.StringPool;
051 import com.liferay.portal.kernel.util.StringUtil;
052 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053 import com.liferay.portal.kernel.util.Validator;
054 import com.liferay.portal.model.CompanyConstants;
055 import com.liferay.portal.search.SearchEngineInitializer;
056 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
057 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
058 import com.liferay.portal.security.auth.TransientTokenUtil;
059 import com.liferay.portal.util.PortalInstances;
060 import com.liferay.portal.util.PortalUtil;
061 import com.liferay.portal.util.PropsUtil;
062 import com.liferay.portal.util.PropsValues;
063
064 import java.io.IOException;
065 import java.io.InputStream;
066 import java.io.OutputStream;
067
068 import java.net.InetAddress;
069 import java.net.InetSocketAddress;
070 import java.net.URL;
071 import java.net.URLConnection;
072
073 import java.util.HashSet;
074 import java.util.List;
075 import java.util.Map;
076 import java.util.Set;
077 import java.util.concurrent.BlockingQueue;
078 import java.util.concurrent.ConcurrentHashMap;
079 import java.util.concurrent.CountDownLatch;
080 import java.util.concurrent.TimeUnit;
081 import java.util.concurrent.TimeoutException;
082
083 import org.apache.commons.lang.time.StopWatch;
084 import org.apache.lucene.analysis.Analyzer;
085 import org.apache.lucene.analysis.TokenStream;
086 import org.apache.lucene.document.Document;
087 import org.apache.lucene.index.IndexReader;
088 import org.apache.lucene.index.Term;
089 import org.apache.lucene.queryParser.QueryParser;
090 import org.apache.lucene.search.BooleanClause;
091 import org.apache.lucene.search.BooleanQuery;
092 import org.apache.lucene.search.IndexSearcher;
093 import org.apache.lucene.search.NumericRangeQuery;
094 import org.apache.lucene.search.Query;
095 import org.apache.lucene.search.TermQuery;
096 import org.apache.lucene.search.TermRangeQuery;
097 import org.apache.lucene.search.WildcardQuery;
098 import org.apache.lucene.search.highlight.Formatter;
099 import org.apache.lucene.search.highlight.Highlighter;
100 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
101 import org.apache.lucene.search.highlight.QueryScorer;
102 import org.apache.lucene.search.highlight.SimpleFragmenter;
103 import org.apache.lucene.search.highlight.WeightedTerm;
104 import org.apache.lucene.util.Version;
105
106
115 public class LuceneHelperImpl implements LuceneHelper {
116
117 @Override
118 public void addDocument(long companyId, Document document)
119 throws IOException {
120
121 IndexAccessor indexAccessor = getIndexAccessor(companyId);
122
123 indexAccessor.addDocument(document);
124 }
125
126 @Override
127 public void addExactTerm(
128 BooleanQuery booleanQuery, String field, String value) {
129
130 addTerm(booleanQuery, field, value, false);
131 }
132
133 @Override
134 public void addNumericRangeTerm(
135 BooleanQuery booleanQuery, String field, Integer startValue,
136 Integer endValue) {
137
138 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
139 field, startValue, endValue, true, true);
140
141 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
142 }
143
144 @Override
145 public void addNumericRangeTerm(
146 BooleanQuery booleanQuery, String field, Long startValue,
147 Long endValue) {
148
149 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
150 field, startValue, endValue, true, true);
151
152 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
153 }
154
155
159 @Deprecated
160 @Override
161 public void addNumericRangeTerm(
162 BooleanQuery booleanQuery, String field, String startValue,
163 String endValue) {
164
165 addNumericRangeTerm(
166 booleanQuery, field, GetterUtil.getLong(startValue),
167 GetterUtil.getLong(endValue));
168 }
169
170 @Override
171 public void addRangeTerm(
172 BooleanQuery booleanQuery, String field, String startValue,
173 String endValue) {
174
175 boolean includesLower = true;
176
177 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
178 includesLower = false;
179 }
180
181 boolean includesUpper = true;
182
183 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
184 includesUpper = false;
185 }
186
187 TermRangeQuery termRangeQuery = new TermRangeQuery(
188 field, startValue, endValue, includesLower, includesUpper);
189
190 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
191 }
192
193 @Override
194 public void addRequiredTerm(
195 BooleanQuery booleanQuery, String field, String value, boolean like) {
196
197 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
198 }
199
200 @Override
201 public void addRequiredTerm(
202 BooleanQuery booleanQuery, String field, String[] values,
203 boolean like) {
204
205 if (values == null) {
206 return;
207 }
208
209 BooleanQuery query = new BooleanQuery();
210
211 for (String value : values) {
212 addTerm(query, field, value, like);
213 }
214
215 booleanQuery.add(query, BooleanClause.Occur.MUST);
216 }
217
218 @Override
219 public void addTerm(
220 BooleanQuery booleanQuery, String field, String value, boolean like) {
221
222 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
223 }
224
225 @Override
226 public void addTerm(
227 BooleanQuery booleanQuery, String field, String value, boolean like,
228 BooleanClauseOccur booleanClauseOccur) {
229
230 if (Validator.isNull(value)) {
231 return;
232 }
233
234 Analyzer analyzer = getAnalyzer();
235
236 if (analyzer instanceof PerFieldAnalyzer) {
237 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
238
239 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
240
241 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
242 like = true;
243 }
244 }
245
246 if (like) {
247 value = StringUtil.replace(
248 value, StringPool.PERCENT, StringPool.BLANK);
249 }
250
251 try {
252 QueryParser queryParser = new QueryParser(
253 getVersion(), field, analyzer);
254
255 Query query = queryParser.parse(value);
256
257 BooleanClause.Occur occur = null;
258
259 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
260 occur = BooleanClause.Occur.MUST;
261 }
262 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
263 occur = BooleanClause.Occur.MUST_NOT;
264 }
265 else {
266 occur = BooleanClause.Occur.SHOULD;
267 }
268
269 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
270 }
271 catch (Exception e) {
272 if (_log.isWarnEnabled()) {
273 _log.warn(e, e);
274 }
275 }
276 }
277
278 @Override
279 public void addTerm(
280 BooleanQuery booleanQuery, String field, String[] values,
281 boolean like) {
282
283 for (String value : values) {
284 addTerm(booleanQuery, field, value, like);
285 }
286 }
287
288
292 @Deprecated
293 @Override
294 public void cleanUp(IndexSearcher indexSearcher) {
295 if (indexSearcher == null) {
296 return;
297 }
298
299 try {
300 indexSearcher.close();
301
302 IndexReader indexReader = indexSearcher.getIndexReader();
303
304 if (indexReader != null) {
305 indexReader.close();
306 }
307 }
308 catch (IOException ioe) {
309 _log.error(ioe, ioe);
310 }
311 }
312
313 @Override
314 public int countScoredFieldNames(Query query, String[] filedNames) {
315 int count = 0;
316
317 for (String fieldName : filedNames) {
318 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
319 query, false, fieldName);
320
321 if ((weightedTerms.length > 0) &&
322 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
323
324 count++;
325 }
326 }
327
328 return count;
329 }
330
331 @Override
332 public void delete(long companyId) {
333 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
334
335 if (indexAccessor == null) {
336 return;
337 }
338
339 indexAccessor.delete();
340 }
341
342 @Override
343 public void deleteDocuments(long companyId, Term term) throws IOException {
344 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
345
346 if (indexAccessor == null) {
347 return;
348 }
349
350 indexAccessor.deleteDocuments(term);
351 }
352
353 @Override
354 public void dumpIndex(long companyId, OutputStream outputStream)
355 throws IOException {
356
357 long lastGeneration = getLastGeneration(companyId);
358
359 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
360 if (_log.isDebugEnabled()) {
361 _log.debug(
362 "Dump index from cluster is not enabled for " + companyId);
363 }
364
365 return;
366 }
367
368 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
369
370 if (indexAccessor == null) {
371 return;
372 }
373
374 indexAccessor.dumpIndex(outputStream);
375 }
376
377 @Override
378 public Analyzer getAnalyzer() {
379 return _analyzer;
380 }
381
382 @Override
383 public IndexAccessor getIndexAccessor(long companyId) {
384 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
385
386 if (indexAccessor != null) {
387 return indexAccessor;
388 }
389
390 synchronized (this) {
391 indexAccessor = _indexAccessors.get(companyId);
392
393 if (indexAccessor == null) {
394 indexAccessor = new IndexAccessorImpl(companyId);
395
396 if (isLoadIndexFromClusterEnabled()) {
397 indexAccessor = new SynchronizedIndexAccessorImpl(
398 indexAccessor);
399
400 boolean clusterForwardMessage = GetterUtil.getBoolean(
401 MessageValuesThreadLocal.getValue(
402 ClusterLink.CLUSTER_FORWARD_MESSAGE));
403
404 if (clusterForwardMessage) {
405 if (_log.isInfoEnabled()) {
406 _log.info(
407 "Skip Luncene index files cluster loading " +
408 "since this is a manual reindex request");
409 }
410 }
411 else {
412 try {
413 _loadIndexFromCluster(
414 indexAccessor,
415 indexAccessor.getLastGeneration());
416 }
417 catch (Exception e) {
418 _log.error(
419 "Unable to load index for company " +
420 indexAccessor.getCompanyId(),
421 e);
422 }
423 }
424 }
425
426 _indexAccessors.put(companyId, indexAccessor);
427 }
428 }
429
430 return indexAccessor;
431 }
432
433 @Override
434 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
435 IndexAccessor indexAccessor = getIndexAccessor(companyId);
436
437 return indexAccessor.acquireIndexSearcher();
438 }
439
440 @Override
441 public long getLastGeneration(long companyId) {
442 if (!isLoadIndexFromClusterEnabled()) {
443 return IndexAccessor.DEFAULT_LAST_GENERATION;
444 }
445
446 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
447
448 if (indexAccessor == null) {
449 return IndexAccessor.DEFAULT_LAST_GENERATION;
450 }
451
452 return indexAccessor.getLastGeneration();
453 }
454
455 @Override
456 public InputStream getLoadIndexesInputStreamFromCluster(
457 long companyId, Address bootupAddress) {
458
459 if (!isLoadIndexFromClusterEnabled()) {
460 return null;
461 }
462
463 InputStream inputStream = null;
464
465 try {
466 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
467 _getBootupClusterNodeObjectValuePair(bootupAddress);
468
469 URL url = bootupClusterNodeObjectValuePair.getValue();
470
471 URLConnection urlConnection = url.openConnection();
472
473 urlConnection.setDoOutput(true);
474
475 try (UnsyncPrintWriter unsyncPrintWriter =
476 UnsyncPrintWriterPool.borrow(
477 urlConnection.getOutputStream())) {
478
479 unsyncPrintWriter.write("transientToken=");
480 unsyncPrintWriter.write(
481 bootupClusterNodeObjectValuePair.getKey());
482 unsyncPrintWriter.write("&companyId=");
483 unsyncPrintWriter.write(String.valueOf(companyId));
484 }
485
486 inputStream = urlConnection.getInputStream();
487
488 return inputStream;
489 }
490 catch (IOException ioe) {
491 throw new SystemException(ioe);
492 }
493 }
494
495 @Override
496 public Set<String> getQueryTerms(Query query) {
497 String queryString = StringUtil.replace(
498 query.toString(), StringPool.STAR, StringPool.BLANK);
499
500 Query tempQuery = null;
501
502 try {
503 QueryParser queryParser = new QueryParser(
504 getVersion(), StringPool.BLANK, getAnalyzer());
505
506 tempQuery = queryParser.parse(queryString);
507 }
508 catch (Exception e) {
509 if (_log.isWarnEnabled()) {
510 _log.warn("Unable to parse " + queryString);
511 }
512
513 tempQuery = query;
514 }
515
516 WeightedTerm[] weightedTerms = null;
517
518 for (String fieldName : Field.KEYWORDS) {
519 weightedTerms = QueryTermExtractor.getTerms(
520 tempQuery, false, fieldName);
521
522 if (weightedTerms.length > 0) {
523 break;
524 }
525 }
526
527 Set<String> queryTerms = new HashSet<String>();
528
529 for (WeightedTerm weightedTerm : weightedTerms) {
530 queryTerms.add(weightedTerm.getTerm());
531 }
532
533 return queryTerms;
534 }
535
536
539 @Deprecated
540 @Override
541 public IndexSearcher getSearcher(long companyId, boolean readOnly)
542 throws IOException {
543
544 IndexAccessor indexAccessor = getIndexAccessor(companyId);
545
546 IndexReader indexReader = IndexReader.open(
547 indexAccessor.getLuceneDir(), readOnly);
548
549 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
550
551 indexSearcher.setDefaultFieldSortScoring(true, false);
552 indexSearcher.setSimilarity(new FieldWeightSimilarity());
553
554 return indexSearcher;
555 }
556
557 @Override
558 public String getSnippet(
559 Query query, String field, String s, int maxNumFragments,
560 int fragmentLength, String fragmentSuffix, Formatter formatter)
561 throws IOException {
562
563 QueryScorer queryScorer = new QueryScorer(query, field);
564
565 Highlighter highlighter = new Highlighter(formatter, queryScorer);
566
567 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
568
569 TokenStream tokenStream = getAnalyzer().tokenStream(
570 field, new UnsyncStringReader(s));
571
572 try {
573 String snippet = highlighter.getBestFragments(
574 tokenStream, s, maxNumFragments, fragmentSuffix);
575
576 if (Validator.isNotNull(snippet) &&
577 !StringUtil.endsWith(snippet, fragmentSuffix) &&
578 !s.equals(snippet)) {
579
580 snippet = snippet.concat(fragmentSuffix);
581 }
582
583 return snippet;
584 }
585 catch (InvalidTokenOffsetsException itoe) {
586 throw new IOException(itoe);
587 }
588 }
589
590 @Override
591 public Version getVersion() {
592 return _version;
593 }
594
595 @Override
596 public boolean isLoadIndexFromClusterEnabled() {
597 if (PropsValues.CLUSTER_LINK_ENABLED &&
598 PropsValues.LUCENE_REPLICATE_WRITE) {
599
600 return true;
601 }
602
603 if (_log.isDebugEnabled()) {
604 _log.debug("Load index from cluster is not enabled");
605 }
606
607 return false;
608 }
609
610 @Override
611 public void loadIndex(long companyId, InputStream inputStream)
612 throws IOException {
613
614 if (!isLoadIndexFromClusterEnabled()) {
615 return;
616 }
617
618 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
619
620 if (indexAccessor == null) {
621 if (_log.isInfoEnabled()) {
622 _log.info(
623 "Skip loading Lucene index files for company " + companyId +
624 " in favor of lazy loading");
625 }
626
627 return;
628 }
629
630 StopWatch stopWatch = new StopWatch();
631
632 stopWatch.start();
633
634 if (_log.isInfoEnabled()) {
635 _log.info(
636 "Start loading Lucene index files for company " + companyId);
637 }
638
639 indexAccessor.loadIndex(inputStream);
640
641 if (_log.isInfoEnabled()) {
642 _log.info(
643 "Finished loading index files for company " + companyId +
644 " in " + stopWatch.getTime() + " ms");
645 }
646 }
647
648 @Override
649 public void loadIndexesFromCluster(long companyId) {
650 if (!isLoadIndexFromClusterEnabled()) {
651 return;
652 }
653
654 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
655
656 if (indexAccessor == null) {
657 return;
658 }
659
660 long localLastGeneration = getLastGeneration(companyId);
661
662 _loadIndexFromCluster(indexAccessor, localLastGeneration);
663 }
664
665 @Override
666 public void releaseIndexSearcher(
667 long companyId, IndexSearcher indexSearcher)
668 throws IOException {
669
670 IndexAccessor indexAccessor = getIndexAccessor(companyId);
671
672 indexAccessor.releaseIndexSearcher(indexSearcher);
673 }
674
675 public void setAnalyzer(Analyzer analyzer) {
676 _analyzer = analyzer;
677 }
678
679 public void setVersion(Version version) {
680 _version = version;
681 }
682
683 @Override
684 public void shutdown() {
685 if (_luceneIndexThreadPoolExecutor != null) {
686 _luceneIndexThreadPoolExecutor.shutdownNow();
687
688 try {
689 _luceneIndexThreadPoolExecutor.awaitTermination(
690 60, TimeUnit.SECONDS);
691 }
692 catch (InterruptedException ie) {
693 _log.error("Lucene indexer shutdown interrupted", ie);
694 }
695 }
696
697 if (isLoadIndexFromClusterEnabled()) {
698 ClusterExecutorUtil.removeClusterEventListener(
699 _loadIndexClusterEventListener);
700 }
701
702 MessageBus messageBus = MessageBusUtil.getMessageBus();
703
704 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
705 String searchWriterDestinationName =
706 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
707
708 Destination searchWriteDestination = messageBus.getDestination(
709 searchWriterDestinationName);
710
711 if (searchWriteDestination != null) {
712 ThreadPoolExecutor threadPoolExecutor =
713 PortalExecutorManagerUtil.getPortalExecutor(
714 searchWriterDestinationName);
715
716 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
717
718 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
719
720 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
721 countDownLatch);
722
723 for (int i = 0; i < maxPoolSize; i++) {
724 threadPoolExecutor.submit(shutdownSyncJob);
725 }
726
727 try {
728 countDownLatch.await();
729 }
730 catch (InterruptedException ie) {
731 _log.error("Shutdown waiting interrupted", ie);
732 }
733
734 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
735
736 if (_log.isDebugEnabled()) {
737 _log.debug(
738 "Cancelled appending indexing jobs: " + runnables);
739 }
740
741 searchWriteDestination.close(true);
742 }
743 }
744
745 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
746 indexAccessor.close();
747 }
748 }
749
750 @Override
751 public void shutdown(long companyId) {
752 IndexAccessor indexAccessor = getIndexAccessor(companyId);
753
754 _indexAccessors.remove(companyId);
755
756 indexAccessor.close();
757 }
758
759 @Override
760 public void startup(long companyId) {
761 if (!PropsValues.INDEX_ON_STARTUP) {
762 return;
763 }
764
765 if (_log.isInfoEnabled()) {
766 _log.info("Indexing Lucene on startup");
767 }
768
769 SearchEngineInitializer searchEngineInitializer =
770 new SearchEngineInitializer(companyId);
771
772 if (PropsValues.INDEX_WITH_THREAD) {
773 if (_luceneIndexThreadPoolExecutor == null) {
774
775
776
777
778
779 _luceneIndexThreadPoolExecutor =
780 PortalExecutorManagerUtil.getPortalExecutor(
781 LuceneHelperImpl.class.getName());
782 }
783
784 _luceneIndexThreadPoolExecutor.execute(searchEngineInitializer);
785 }
786 else {
787 searchEngineInitializer.reindex();
788 }
789 }
790
791 @Override
792 public void updateDocument(long companyId, Term term, Document document)
793 throws IOException {
794
795 IndexAccessor indexAccessor = getIndexAccessor(companyId);
796
797 indexAccessor.updateDocument(term, document);
798 }
799
800 private LuceneHelperImpl() {
801 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
802 _luceneIndexThreadPoolExecutor =
803 PortalExecutorManagerUtil.getPortalExecutor(
804 LuceneHelperImpl.class.getName());
805 }
806
807 if (isLoadIndexFromClusterEnabled()) {
808 _loadIndexClusterEventListener =
809 new LoadIndexClusterEventListener();
810
811 ClusterExecutorUtil.addClusterEventListener(
812 _loadIndexClusterEventListener);
813 }
814
815 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
816
817 if (StringUtil.equalsIgnoreCase(
818 Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL)) {
819
820 _protocol = Http.HTTPS;
821 }
822 else {
823 _protocol = Http.HTTP;
824 }
825 }
826
827 private ObjectValuePair<String, URL>
828 _getBootupClusterNodeObjectValuePair(Address bootupAddress) {
829
830 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
831 new MethodHandler(
832 _createTokenMethodKey,
833 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
834 bootupAddress);
835
836 FutureClusterResponses futureClusterResponses =
837 ClusterExecutorUtil.execute(clusterRequest);
838
839 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
840 futureClusterResponses.getPartialResults();
841
842 try {
843 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
844 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
845 TimeUnit.MILLISECONDS);
846
847 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
848
849 InetSocketAddress inetSocketAddress =
850 clusterNode.getPortalInetSocketAddress();
851
852 if (inetSocketAddress == null) {
853 StringBundler sb = new StringBundler(6);
854
855 sb.append("Invalid cluster node InetSocketAddress ");
856 sb.append(". The InetSocketAddress is set by the first ");
857 sb.append("request or configured in portal.properties by the");
858 sb.append("properties ");
859 sb.append("\"portal.instance.http.inet.socket.address\" and ");
860 sb.append("\"portal.instance.https.inet.socket.address\".");
861
862 throw new Exception(sb.toString());
863 }
864
865 InetAddress inetAddress = inetSocketAddress.getAddress();
866
867 String fileName = PortalUtil.getPathContext();
868
869 if (!fileName.endsWith(StringPool.SLASH)) {
870 fileName = fileName.concat(StringPool.SLASH);
871 }
872
873 fileName = fileName.concat("lucene/dump");
874
875 URL url = new URL(
876 _protocol, inetAddress.getHostAddress(),
877 inetSocketAddress.getPort(), fileName);
878
879 String transientToken = (String)clusterNodeResponse.getResult();
880
881 return new ObjectValuePair<String, URL>(transientToken, url);
882 }
883 catch (Exception e) {
884 throw new SystemException(e);
885 }
886 }
887
888 private void _includeIfUnique(
889 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
890 Query query, BooleanClause.Occur occur) {
891
892 if (query instanceof TermQuery) {
893 Set<Term> terms = new HashSet<Term>();
894
895 TermQuery termQuery = (TermQuery)query;
896
897 termQuery.extractTerms(terms);
898
899 float boost = termQuery.getBoost();
900
901 for (Term term : terms) {
902 String termValue = term.text();
903
904 if (like) {
905 termValue = termValue.toLowerCase(queryParser.getLocale());
906
907 term = term.createTerm(
908 StringPool.STAR.concat(termValue).concat(
909 StringPool.STAR));
910
911 query = new WildcardQuery(term);
912 }
913 else {
914 query = new TermQuery(term);
915 }
916
917 query.setBoost(boost);
918
919 boolean included = false;
920
921 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
922 if (query.equals(booleanClause.getQuery())) {
923 included = true;
924 }
925 }
926
927 if (!included) {
928 booleanQuery.add(query, occur);
929 }
930 }
931 }
932 else if (query instanceof BooleanQuery) {
933 BooleanQuery curBooleanQuery = (BooleanQuery)query;
934
935 BooleanQuery containerBooleanQuery = new BooleanQuery();
936
937 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
938 _includeIfUnique(
939 containerBooleanQuery, like, queryParser,
940 booleanClause.getQuery(), booleanClause.getOccur());
941 }
942
943 if (containerBooleanQuery.getClauses().length > 0) {
944 booleanQuery.add(containerBooleanQuery, occur);
945 }
946 }
947 else {
948 boolean included = false;
949
950 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
951 if (query.equals(booleanClause.getQuery())) {
952 included = true;
953 }
954 }
955
956 if (!included) {
957 booleanQuery.add(query, occur);
958 }
959 }
960 }
961
962 private void _loadIndexFromCluster(
963 IndexAccessor indexAccessor, long localLastGeneration) {
964
965 List<Address> clusterNodeAddresses =
966 ClusterExecutorUtil.getClusterNodeAddresses();
967
968 int clusterNodeAddressesCount = clusterNodeAddresses.size();
969
970 if (clusterNodeAddressesCount <= 1) {
971 if (_log.isDebugEnabled()) {
972 _log.debug(
973 "Do not load indexes because there is either one portal " +
974 "instance or no portal instances in the cluster");
975 }
976
977 return;
978 }
979
980 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
981 new MethodHandler(
982 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
983 true);
984
985 ClusterExecutorUtil.execute(
986 clusterRequest,
987 new LoadIndexClusterResponseCallback(
988 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
989 }
990
991 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
992 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
993
994 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
995 GetterUtil.getInteger(
996 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
997 BooleanQuery.getMaxClauseCount());
998
999 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1000
1001 private static MethodKey _createTokenMethodKey = new MethodKey(
1002 TransientTokenUtil.class, "createToken", long.class);
1003 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1004 LuceneHelperUtil.class, "getLastGeneration", long.class);
1005
1006 private Analyzer _analyzer;
1007 private Map<Long, IndexAccessor> _indexAccessors =
1008 new ConcurrentHashMap<Long, IndexAccessor>();
1009 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1010 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1011 private String _protocol;
1012 private Version _version;
1013
1014 private static class ShutdownSyncJob implements Runnable {
1015
1016 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1017 _countDownLatch = countDownLatch;
1018 }
1019
1020 @Override
1021 public void run() {
1022 _countDownLatch.countDown();
1023
1024 try {
1025 synchronized (this) {
1026 wait();
1027 }
1028 }
1029 catch (InterruptedException ie) {
1030 }
1031 }
1032
1033 private final CountDownLatch _countDownLatch;
1034
1035 }
1036
1037 private class LoadIndexClusterEventListener
1038 implements ClusterEventListener {
1039
1040 @Override
1041 public void processClusterEvent(ClusterEvent clusterEvent) {
1042 ClusterEventType clusterEventType =
1043 clusterEvent.getClusterEventType();
1044
1045 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1046 return;
1047 }
1048
1049 List<Address> clusterNodeAddresses =
1050 ClusterExecutorUtil.getClusterNodeAddresses();
1051 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1052
1053 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1054 if (_log.isDebugEnabled()) {
1055 _log.debug(
1056 "Number of original cluster members is greater than " +
1057 "one");
1058 }
1059
1060 return;
1061 }
1062
1063 long[] companyIds = PortalInstances.getCompanyIds();
1064
1065 for (long companyId : companyIds) {
1066 loadIndexes(companyId);
1067 }
1068
1069 loadIndexes(CompanyConstants.SYSTEM);
1070 }
1071
1072 private void loadIndexes(long companyId) {
1073 long lastGeneration = getLastGeneration(companyId);
1074
1075 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1076 return;
1077 }
1078
1079 try {
1080 LuceneClusterUtil.loadIndexesFromCluster(companyId);
1081 }
1082 catch (Exception e) {
1083 _log.error(
1084 "Unable to load indexes for company " + companyId, e);
1085 }
1086 }
1087
1088 }
1089
1090 private class LoadIndexClusterResponseCallback
1091 extends BaseClusterResponseCallback {
1092
1093 public LoadIndexClusterResponseCallback(
1094 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1095 long localLastGeneration) {
1096
1097 _indexAccessor = indexAccessor;
1098 _clusterNodeAddressesCount = clusterNodeAddressesCount;
1099 _localLastGeneration = localLastGeneration;
1100
1101 _companyId = _indexAccessor.getCompanyId();
1102 }
1103
1104 @Override
1105 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1106 Address bootupAddress = null;
1107
1108 do {
1109 _clusterNodeAddressesCount--;
1110
1111 ClusterNodeResponse clusterNodeResponse = null;
1112
1113 try {
1114 clusterNodeResponse = blockingQueue.poll(
1115 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1116 TimeUnit.MILLISECONDS);
1117 }
1118 catch (Exception e) {
1119 _log.error("Unable to get cluster node response", e);
1120 }
1121
1122 if (clusterNodeResponse == null) {
1123 if (_log.isDebugEnabled()) {
1124 _log.debug(
1125 "Unable to get cluster node response in " +
1126 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1127 TimeUnit.MILLISECONDS);
1128 }
1129
1130 continue;
1131 }
1132
1133 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1134
1135 if (clusterNode.getPortalInetSocketAddress() != null) {
1136 try {
1137 long remoteLastGeneration =
1138 (Long)clusterNodeResponse.getResult();
1139
1140 if (remoteLastGeneration > _localLastGeneration) {
1141 bootupAddress = clusterNodeResponse.getAddress();
1142
1143 break;
1144 }
1145 }
1146 catch (Exception e) {
1147 if (_log.isDebugEnabled()) {
1148 _log.debug(
1149 "Suppress exception caused by remote method " +
1150 "invocation",
1151 e);
1152 }
1153
1154 continue;
1155 }
1156 }
1157 else if (_log.isDebugEnabled()) {
1158 _log.debug(
1159 "Cluster node " + clusterNode +
1160 " has invalid InetSocketAddress");
1161 }
1162 }
1163 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1164
1165 if (bootupAddress == null) {
1166 return;
1167 }
1168
1169 if (_log.isInfoEnabled()) {
1170 _log.info(
1171 "Start loading lucene index files from cluster node " +
1172 bootupAddress);
1173 }
1174
1175 InputStream inputStream = null;
1176
1177 try {
1178 inputStream = getLoadIndexesInputStreamFromCluster(
1179 _companyId, bootupAddress);
1180
1181 _indexAccessor.loadIndex(inputStream);
1182
1183 if (_log.isInfoEnabled()) {
1184 _log.info("Lucene index files loaded successfully");
1185 }
1186 }
1187 catch (Exception e) {
1188 _log.error("Unable to load index for company " + _companyId, e);
1189 }
1190 finally {
1191 if (inputStream != null) {
1192 try {
1193 inputStream.close();
1194 }
1195 catch (IOException ioe) {
1196 _log.error(
1197 "Unable to close input stream for company " +
1198 _companyId,
1199 ioe);
1200 }
1201 }
1202 }
1203 }
1204
1205 @Override
1206 public void processTimeoutException(TimeoutException timeoutException) {
1207 _log.error(
1208 "Unable to load index for company " + _companyId,
1209 timeoutException);
1210 }
1211
1212 private int _clusterNodeAddressesCount;
1213 private long _companyId;
1214 private IndexAccessor _indexAccessor;
1215 private long _localLastGeneration;
1216
1217 }
1218
1219 }