001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterLink;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028 import com.liferay.portal.kernel.exception.SystemException;
029 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032 import com.liferay.portal.kernel.log.Log;
033 import com.liferay.portal.kernel.log.LogFactoryUtil;
034 import com.liferay.portal.kernel.messaging.Destination;
035 import com.liferay.portal.kernel.messaging.MessageBus;
036 import com.liferay.portal.kernel.messaging.MessageBusUtil;
037 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038 import com.liferay.portal.kernel.search.BooleanClauseOccur;
039 import com.liferay.portal.kernel.search.Field;
040 import com.liferay.portal.kernel.search.SearchEngineUtil;
041 import com.liferay.portal.kernel.util.ArrayUtil;
042 import com.liferay.portal.kernel.util.GetterUtil;
043 import com.liferay.portal.kernel.util.MethodHandler;
044 import com.liferay.portal.kernel.util.MethodKey;
045 import com.liferay.portal.kernel.util.ObjectValuePair;
046 import com.liferay.portal.kernel.util.PropsKeys;
047 import com.liferay.portal.kernel.util.StringBundler;
048 import com.liferay.portal.kernel.util.StringPool;
049 import com.liferay.portal.kernel.util.StringUtil;
050 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
051 import com.liferay.portal.kernel.util.Validator;
052 import com.liferay.portal.model.CompanyConstants;
053 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
054 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
055 import com.liferay.portal.security.auth.TransientTokenUtil;
056 import com.liferay.portal.util.PortalInstances;
057 import com.liferay.portal.util.PortalUtil;
058 import com.liferay.portal.util.PropsUtil;
059 import com.liferay.portal.util.PropsValues;
060
061 import java.io.IOException;
062 import java.io.InputStream;
063 import java.io.OutputStream;
064
065 import java.net.InetAddress;
066 import java.net.URL;
067 import java.net.URLConnection;
068
069 import java.util.HashSet;
070 import java.util.List;
071 import java.util.Map;
072 import java.util.Set;
073 import java.util.concurrent.BlockingQueue;
074 import java.util.concurrent.ConcurrentHashMap;
075 import java.util.concurrent.CountDownLatch;
076 import java.util.concurrent.TimeUnit;
077
078 import org.apache.commons.lang.time.StopWatch;
079 import org.apache.lucene.analysis.Analyzer;
080 import org.apache.lucene.analysis.TokenStream;
081 import org.apache.lucene.document.Document;
082 import org.apache.lucene.index.IndexReader;
083 import org.apache.lucene.index.Term;
084 import org.apache.lucene.queryParser.QueryParser;
085 import org.apache.lucene.search.BooleanClause;
086 import org.apache.lucene.search.BooleanQuery;
087 import org.apache.lucene.search.IndexSearcher;
088 import org.apache.lucene.search.NumericRangeQuery;
089 import org.apache.lucene.search.Query;
090 import org.apache.lucene.search.TermQuery;
091 import org.apache.lucene.search.TermRangeQuery;
092 import org.apache.lucene.search.WildcardQuery;
093 import org.apache.lucene.search.highlight.Formatter;
094 import org.apache.lucene.search.highlight.Highlighter;
095 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
096 import org.apache.lucene.search.highlight.QueryScorer;
097 import org.apache.lucene.search.highlight.SimpleFragmenter;
098 import org.apache.lucene.search.highlight.WeightedTerm;
099 import org.apache.lucene.util.Version;
100
101
110 public class LuceneHelperImpl implements LuceneHelper {
111
112 @Override
113 public void addDocument(long companyId, Document document)
114 throws IOException {
115
116 IndexAccessor indexAccessor = getIndexAccessor(companyId);
117
118 indexAccessor.addDocument(document);
119 }
120
121 @Override
122 public void addExactTerm(
123 BooleanQuery booleanQuery, String field, String value) {
124
125 addTerm(booleanQuery, field, value, false);
126 }
127
128 @Override
129 public void addNumericRangeTerm(
130 BooleanQuery booleanQuery, String field, Integer startValue,
131 Integer endValue) {
132
133 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
134 field, startValue, endValue, true, true);
135
136 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
137 }
138
139 @Override
140 public void addNumericRangeTerm(
141 BooleanQuery booleanQuery, String field, Long startValue,
142 Long endValue) {
143
144 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
145 field, startValue, endValue, true, true);
146
147 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
148 }
149
150
154 @Override
155 public void addNumericRangeTerm(
156 BooleanQuery booleanQuery, String field, String startValue,
157 String endValue) {
158
159 addNumericRangeTerm(
160 booleanQuery, field, GetterUtil.getLong(startValue),
161 GetterUtil.getLong(endValue));
162 }
163
164 @Override
165 public void addRangeTerm(
166 BooleanQuery booleanQuery, String field, String startValue,
167 String endValue) {
168
169 boolean includesLower = true;
170
171 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
172 includesLower = false;
173 }
174
175 boolean includesUpper = true;
176
177 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
178 includesUpper = false;
179 }
180
181 TermRangeQuery termRangeQuery = new TermRangeQuery(
182 field, startValue, endValue, includesLower, includesUpper);
183
184 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
185 }
186
187 @Override
188 public void addRequiredTerm(
189 BooleanQuery booleanQuery, String field, String value, boolean like) {
190
191 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
192 }
193
194 @Override
195 public void addRequiredTerm(
196 BooleanQuery booleanQuery, String field, String[] values,
197 boolean like) {
198
199 if (values == null) {
200 return;
201 }
202
203 BooleanQuery query = new BooleanQuery();
204
205 for (String value : values) {
206 addTerm(query, field, value, like);
207 }
208
209 booleanQuery.add(query, BooleanClause.Occur.MUST);
210 }
211
212 @Override
213 public void addTerm(
214 BooleanQuery booleanQuery, String field, String value, boolean like) {
215
216 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
217 }
218
219 @Override
220 public void addTerm(
221 BooleanQuery booleanQuery, String field, String value, boolean like,
222 BooleanClauseOccur booleanClauseOccur) {
223
224 if (Validator.isNull(value)) {
225 return;
226 }
227
228 Analyzer analyzer = getAnalyzer();
229
230 if (analyzer instanceof PerFieldAnalyzer) {
231 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
232
233 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
234
235 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
236 like = true;
237 }
238 }
239
240 if (like) {
241 value = StringUtil.replace(
242 value, StringPool.PERCENT, StringPool.BLANK);
243 }
244
245 try {
246 QueryParser queryParser = new QueryParser(
247 getVersion(), field, analyzer);
248
249 value = StringUtil.replace(
250 value, _KEYWORDS_LOWERCASE, _KEYWORDS_UPPERCASE);
251
252 Query query = queryParser.parse(value);
253
254 BooleanClause.Occur occur = null;
255
256 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
257 occur = BooleanClause.Occur.MUST;
258 }
259 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
260 occur = BooleanClause.Occur.MUST_NOT;
261 }
262 else {
263 occur = BooleanClause.Occur.SHOULD;
264 }
265
266 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
267 }
268 catch (Exception e) {
269 if (_log.isWarnEnabled()) {
270 _log.warn(e, e);
271 }
272 }
273 }
274
275 @Override
276 public void addTerm(
277 BooleanQuery booleanQuery, String field, String[] values,
278 boolean like) {
279
280 for (String value : values) {
281 addTerm(booleanQuery, field, value, like);
282 }
283 }
284
285
289 @Deprecated
290 @Override
291 public void cleanUp(IndexSearcher indexSearcher) {
292 if (indexSearcher == null) {
293 return;
294 }
295
296 try {
297 indexSearcher.close();
298
299 IndexReader indexReader = indexSearcher.getIndexReader();
300
301 if (indexReader != null) {
302 indexReader.close();
303 }
304 }
305 catch (IOException ioe) {
306 _log.error(ioe, ioe);
307 }
308 }
309
310 @Override
311 public int countScoredFieldNames(Query query, String[] filedNames) {
312 int count = 0;
313
314 for (String fieldName : filedNames) {
315 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
316 query, false, fieldName);
317
318 if ((weightedTerms.length > 0) &&
319 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
320
321 count++;
322 }
323 }
324
325 return count;
326 }
327
328 @Override
329 public void delete(long companyId) {
330 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
331
332 if (indexAccessor == null) {
333 return;
334 }
335
336 indexAccessor.delete();
337 }
338
339 @Override
340 public void deleteDocuments(long companyId, Term term) throws IOException {
341 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
342
343 if (indexAccessor == null) {
344 return;
345 }
346
347 indexAccessor.deleteDocuments(term);
348 }
349
350 @Override
351 public void dumpIndex(long companyId, OutputStream outputStream)
352 throws IOException {
353
354 long lastGeneration = getLastGeneration(companyId);
355
356 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
357 if (_log.isDebugEnabled()) {
358 _log.debug(
359 "Dump index from cluster is not enabled for " + companyId);
360 }
361
362 return;
363 }
364
365 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
366
367 if (indexAccessor == null) {
368 return;
369 }
370
371 indexAccessor.dumpIndex(outputStream);
372 }
373
374 @Override
375 public Analyzer getAnalyzer() {
376 return _analyzer;
377 }
378
379 @Override
380 public IndexAccessor getIndexAccessor(long companyId) {
381 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
382
383 if (indexAccessor != null) {
384 return indexAccessor;
385 }
386
387 synchronized (this) {
388 indexAccessor = _indexAccessors.get(companyId);
389
390 if (indexAccessor == null) {
391 indexAccessor = new IndexAccessorImpl(companyId);
392
393 if (isLoadIndexFromClusterEnabled()) {
394 indexAccessor = new SynchronizedIndexAccessorImpl(
395 indexAccessor);
396
397 boolean clusterForwardMessage = GetterUtil.getBoolean(
398 MessageValuesThreadLocal.getValue(
399 ClusterLink.CLUSTER_FORWARD_MESSAGE));
400
401 if (clusterForwardMessage) {
402 if (_log.isInfoEnabled()) {
403 _log.info(
404 "Skip Luncene index files cluster loading " +
405 "since this is a manual reindex request");
406 }
407 }
408 else {
409 try {
410 _loadIndexFromCluster(
411 indexAccessor,
412 indexAccessor.getLastGeneration());
413 }
414 catch (Exception e) {
415 _log.error(
416 "Unable to load index for company " +
417 indexAccessor.getCompanyId(),
418 e);
419 }
420 }
421 }
422
423 _indexAccessors.put(companyId, indexAccessor);
424 }
425 }
426
427 return indexAccessor;
428 }
429
430 @Override
431 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
432 IndexAccessor indexAccessor = getIndexAccessor(companyId);
433
434 return indexAccessor.acquireIndexSearcher();
435 }
436
437 @Override
438 public long getLastGeneration(long companyId) {
439 if (!isLoadIndexFromClusterEnabled()) {
440 return IndexAccessor.DEFAULT_LAST_GENERATION;
441 }
442
443 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
444
445 if (indexAccessor == null) {
446 return IndexAccessor.DEFAULT_LAST_GENERATION;
447 }
448
449 return indexAccessor.getLastGeneration();
450 }
451
452 @Override
453 public InputStream getLoadIndexesInputStreamFromCluster(
454 long companyId, Address bootupAddress)
455 throws SystemException {
456
457 if (!isLoadIndexFromClusterEnabled()) {
458 return null;
459 }
460
461 InputStream inputStream = null;
462
463 try {
464 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
465 _getBootupClusterNodeObjectValuePair(bootupAddress);
466
467 URL url = bootupClusterNodeObjectValuePair.getValue();
468
469 URLConnection urlConnection = url.openConnection();
470
471 urlConnection.setDoOutput(true);
472
473 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
474 urlConnection.getOutputStream());
475
476 unsyncPrintWriter.write("transientToken=");
477 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
478 unsyncPrintWriter.write("&companyId=");
479 unsyncPrintWriter.write(String.valueOf(companyId));
480
481 unsyncPrintWriter.close();
482
483 inputStream = urlConnection.getInputStream();
484
485 return inputStream;
486 }
487 catch (IOException ioe) {
488 throw new SystemException(ioe);
489 }
490 }
491
492 @Override
493 public Set<String> getQueryTerms(Query query) {
494 String queryString = StringUtil.replace(
495 query.toString(), StringPool.STAR, StringPool.BLANK);
496
497 Query tempQuery = null;
498
499 try {
500 QueryParser queryParser = new QueryParser(
501 getVersion(), StringPool.BLANK, getAnalyzer());
502
503 tempQuery = queryParser.parse(queryString);
504 }
505 catch (Exception e) {
506 if (_log.isWarnEnabled()) {
507 _log.warn("Unable to parse " + queryString);
508 }
509
510 tempQuery = query;
511 }
512
513 WeightedTerm[] weightedTerms = null;
514
515 for (String fieldName : Field.KEYWORDS) {
516 weightedTerms = QueryTermExtractor.getTerms(
517 tempQuery, false, fieldName);
518
519 if (weightedTerms.length > 0) {
520 break;
521 }
522 }
523
524 Set<String> queryTerms = new HashSet<String>();
525
526 for (WeightedTerm weightedTerm : weightedTerms) {
527 queryTerms.add(weightedTerm.getTerm());
528 }
529
530 return queryTerms;
531 }
532
533
536 @Deprecated
537 @Override
538 public IndexSearcher getSearcher(long companyId, boolean readOnly)
539 throws IOException {
540
541 IndexAccessor indexAccessor = getIndexAccessor(companyId);
542
543 IndexReader indexReader = IndexReader.open(
544 indexAccessor.getLuceneDir(), readOnly);
545
546 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
547
548 indexSearcher.setDefaultFieldSortScoring(true, false);
549 indexSearcher.setSimilarity(new FieldWeightSimilarity());
550
551 return indexSearcher;
552 }
553
554 @Override
555 public String getSnippet(
556 Query query, String field, String s, int maxNumFragments,
557 int fragmentLength, String fragmentSuffix, Formatter formatter)
558 throws IOException {
559
560 QueryScorer queryScorer = new QueryScorer(query, field);
561
562 Highlighter highlighter = new Highlighter(formatter, queryScorer);
563
564 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
565
566 TokenStream tokenStream = getAnalyzer().tokenStream(
567 field, new UnsyncStringReader(s));
568
569 try {
570 String snippet = highlighter.getBestFragments(
571 tokenStream, s, maxNumFragments, fragmentSuffix);
572
573 if (Validator.isNotNull(snippet) &&
574 !StringUtil.endsWith(snippet, fragmentSuffix) &&
575 !s.equals(snippet)) {
576
577 snippet = snippet.concat(fragmentSuffix);
578 }
579
580 return snippet;
581 }
582 catch (InvalidTokenOffsetsException itoe) {
583 throw new IOException(itoe.getMessage());
584 }
585 }
586
587 @Override
588 public Version getVersion() {
589 return _version;
590 }
591
592 @Override
593 public boolean isLoadIndexFromClusterEnabled() {
594 if (PropsValues.CLUSTER_LINK_ENABLED &&
595 PropsValues.LUCENE_REPLICATE_WRITE) {
596
597 return true;
598 }
599
600 if (_log.isDebugEnabled()) {
601 _log.debug("Load index from cluster is not enabled");
602 }
603
604 return false;
605 }
606
607 @Override
608 public void loadIndex(long companyId, InputStream inputStream)
609 throws IOException {
610
611 if (!isLoadIndexFromClusterEnabled()) {
612 return;
613 }
614
615 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
616
617 if (indexAccessor == null) {
618 if (_log.isInfoEnabled()) {
619 _log.info(
620 "Skip loading Lucene index files for company " + companyId +
621 " in favor of lazy loading");
622 }
623
624 return;
625 }
626
627 StopWatch stopWatch = new StopWatch();
628
629 stopWatch.start();
630
631 if (_log.isInfoEnabled()) {
632 _log.info(
633 "Start loading Lucene index files for company " + companyId);
634 }
635
636 indexAccessor.loadIndex(inputStream);
637
638 if (_log.isInfoEnabled()) {
639 _log.info(
640 "Finished loading index files for company " + companyId +
641 " in " + stopWatch.getTime() + " ms");
642 }
643 }
644
645 @Override
646 public void loadIndexesFromCluster(long companyId) throws SystemException {
647 if (!isLoadIndexFromClusterEnabled()) {
648 return;
649 }
650
651 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
652
653 if (indexAccessor == null) {
654 return;
655 }
656
657 long localLastGeneration = getLastGeneration(companyId);
658
659 _loadIndexFromCluster(indexAccessor, localLastGeneration);
660 }
661
662 @Override
663 public void releaseIndexSearcher(
664 long companyId, IndexSearcher indexSearcher)
665 throws IOException {
666
667 IndexAccessor indexAccessor = getIndexAccessor(companyId);
668
669 indexAccessor.releaseIndexSearcher(indexSearcher);
670 }
671
672 public void setAnalyzer(Analyzer analyzer) {
673 _analyzer = analyzer;
674 }
675
676 public void setVersion(Version version) {
677 _version = version;
678 }
679
680 @Override
681 public void shutdown() {
682 if (_luceneIndexThreadPoolExecutor != null) {
683 _luceneIndexThreadPoolExecutor.shutdownNow();
684
685 try {
686 _luceneIndexThreadPoolExecutor.awaitTermination(
687 60, TimeUnit.SECONDS);
688 }
689 catch (InterruptedException ie) {
690 _log.error("Lucene indexer shutdown interrupted", ie);
691 }
692 }
693
694 if (isLoadIndexFromClusterEnabled()) {
695 ClusterExecutorUtil.removeClusterEventListener(
696 _loadIndexClusterEventListener);
697 }
698
699 MessageBus messageBus = MessageBusUtil.getMessageBus();
700
701 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
702 String searchWriterDestinationName =
703 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
704
705 Destination searchWriteDestination = messageBus.getDestination(
706 searchWriterDestinationName);
707
708 if (searchWriteDestination != null) {
709 ThreadPoolExecutor threadPoolExecutor =
710 PortalExecutorManagerUtil.getPortalExecutor(
711 searchWriterDestinationName);
712
713 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
714
715 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
716
717 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
718 countDownLatch);
719
720 for (int i = 0; i < maxPoolSize; i++) {
721 threadPoolExecutor.submit(shutdownSyncJob);
722 }
723
724 try {
725 countDownLatch.await();
726 }
727 catch (InterruptedException ie) {
728 _log.error("Shutdown waiting interrupted", ie);
729 }
730
731 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
732
733 if (_log.isDebugEnabled()) {
734 _log.debug(
735 "Cancelled appending indexing jobs: " + runnables);
736 }
737
738 searchWriteDestination.close(true);
739 }
740 }
741
742 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
743 indexAccessor.close();
744 }
745 }
746
747 @Override
748 public void shutdown(long companyId) {
749 IndexAccessor indexAccessor = getIndexAccessor(companyId);
750
751 _indexAccessors.remove(indexAccessor);
752
753 indexAccessor.close();
754 }
755
756 @Override
757 public void startup(long companyId) {
758 if (!PropsValues.INDEX_ON_STARTUP) {
759 return;
760 }
761
762 if (_log.isInfoEnabled()) {
763 _log.info("Indexing Lucene on startup");
764 }
765
766 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
767
768 if (PropsValues.INDEX_WITH_THREAD) {
769 if (_luceneIndexThreadPoolExecutor == null) {
770
771
772
773
774
775 _luceneIndexThreadPoolExecutor =
776 PortalExecutorManagerUtil.getPortalExecutor(
777 LuceneHelperImpl.class.getName());
778 }
779
780 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
781 }
782 else {
783 luceneIndexer.reindex();
784 }
785 }
786
787 @Override
788 public void updateDocument(long companyId, Term term, Document document)
789 throws IOException {
790
791 IndexAccessor indexAccessor = getIndexAccessor(companyId);
792
793 indexAccessor.updateDocument(term, document);
794 }
795
796 private LuceneHelperImpl() {
797 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
798 _luceneIndexThreadPoolExecutor =
799 PortalExecutorManagerUtil.getPortalExecutor(
800 LuceneHelperImpl.class.getName());
801 }
802
803 if (isLoadIndexFromClusterEnabled()) {
804 _loadIndexClusterEventListener =
805 new LoadIndexClusterEventListener();
806
807 ClusterExecutorUtil.addClusterEventListener(
808 _loadIndexClusterEventListener);
809 }
810
811 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
812 }
813
814 private ObjectValuePair<String, URL>
815 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
816 throws SystemException {
817
818 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
819 new MethodHandler(
820 _createTokenMethodKey,
821 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
822 bootupAddress);
823
824 FutureClusterResponses futureClusterResponses =
825 ClusterExecutorUtil.execute(clusterRequest);
826
827 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
828 futureClusterResponses.getPartialResults();
829
830 try {
831 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
832 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
833 TimeUnit.MILLISECONDS);
834
835 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
836
837 int port = clusterNode.getPort();
838
839 if (port <= 0) {
840 StringBundler sb = new StringBundler(6);
841
842 sb.append("Invalid cluster node port ");
843 sb.append(port);
844 sb.append(". The port is set by the first request or ");
845 sb.append("configured in portal.properties by the properties ");
846 sb.append("\"portal.instance.http.port\" and ");
847 sb.append("\"portal.instance.https.port\".");
848
849 throw new Exception(sb.toString());
850 }
851
852 String protocol = clusterNode.getPortalProtocol();
853
854 if (Validator.isNull(protocol)) {
855 StringBundler sb = new StringBundler(4);
856
857 sb.append("Cluster node protocol is empty. The protocol is ");
858 sb.append("set by the first request or configured in ");
859 sb.append("portal.properties by the property ");
860 sb.append("\"portal.instance.protocol\"");
861
862 throw new Exception(sb.toString());
863 }
864
865 InetAddress inetAddress = clusterNode.getInetAddress();
866
867 String hostName = null;
868
869 if (PropsValues.LUCENE_CLUSTER_INDEX_USE_CANONICAL_HOST_NAME) {
870 hostName = inetAddress.getCanonicalHostName();
871 }
872 else {
873 hostName = inetAddress.getHostAddress();
874 }
875
876 String fileName = PortalUtil.getPathContext();
877
878 if (!fileName.endsWith(StringPool.SLASH)) {
879 fileName = fileName.concat(StringPool.SLASH);
880 }
881
882 fileName = fileName.concat("lucene/dump");
883
884 URL url = new URL(protocol, hostName, port, fileName);
885
886 String transientToken = (String)clusterNodeResponse.getResult();
887
888 return new ObjectValuePair<String, URL>(transientToken, url);
889 }
890 catch (Exception e) {
891 throw new SystemException(e);
892 }
893 }
894
895 private void _handleFutureClusterResponses(
896 FutureClusterResponses futureClusterResponses,
897 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
898 long localLastGeneration) {
899
900 BlockingQueue<ClusterNodeResponse> blockingQueue =
901 futureClusterResponses.getPartialResults();
902
903 long companyId = indexAccessor.getCompanyId();
904
905 Address bootupAddress = null;
906
907 do {
908 clusterNodeAddressesCount--;
909
910 ClusterNodeResponse clusterNodeResponse = null;
911
912 try {
913 clusterNodeResponse = blockingQueue.poll(
914 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
915 TimeUnit.MILLISECONDS);
916 }
917 catch (Exception e) {
918 _log.error("Unable to get cluster node response", e);
919 }
920
921 if (clusterNodeResponse == null) {
922 if (_log.isDebugEnabled()) {
923 _log.debug(
924 "Unable to get cluster node response in " +
925 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
926 TimeUnit.MILLISECONDS);
927 }
928
929 continue;
930 }
931
932 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
933
934 if (clusterNode.getPort() > 0) {
935 try {
936 long remoteLastGeneration =
937 (Long)clusterNodeResponse.getResult();
938
939 if (remoteLastGeneration > localLastGeneration) {
940 bootupAddress = clusterNodeResponse.getAddress();
941
942 break;
943 }
944 }
945 catch (Exception e) {
946 if (_log.isDebugEnabled()) {
947 _log.debug(
948 "Suppress exception caused by remote method " +
949 "invocation",
950 e);
951 }
952
953 continue;
954 }
955 }
956 else {
957 if (_log.isDebugEnabled()) {
958 _log.debug(
959 "Cluster node " + clusterNode +
960 " has invalid port");
961 }
962 }
963 }
964 while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
965
966 if (bootupAddress == null) {
967 return;
968 }
969
970 if (_log.isInfoEnabled()) {
971 _log.info(
972 "Start loading lucene index files from cluster node " +
973 bootupAddress);
974 }
975
976 InputStream inputStream = null;
977
978 try {
979 inputStream = getLoadIndexesInputStreamFromCluster(
980 companyId, bootupAddress);
981
982 indexAccessor.loadIndex(inputStream);
983
984 if (_log.isInfoEnabled()) {
985 _log.info("Lucene index files loaded successfully");
986 }
987 }
988 catch (Exception e) {
989 _log.error("Unable to load index for company " + companyId, e);
990 }
991 finally {
992 if (inputStream != null) {
993 try {
994 inputStream.close();
995 }
996 catch (IOException ioe) {
997 _log.error(
998 "Unable to close input stream for company " +
999 companyId,
1000 ioe);
1001 }
1002 }
1003 }
1004 }
1005
1006 private void _includeIfUnique(
1007 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
1008 Query query, BooleanClause.Occur occur) {
1009
1010 if (query instanceof TermQuery) {
1011 Set<Term> terms = new HashSet<Term>();
1012
1013 TermQuery termQuery = (TermQuery)query;
1014
1015 termQuery.extractTerms(terms);
1016
1017 float boost = termQuery.getBoost();
1018
1019 for (Term term : terms) {
1020 String termValue = term.text();
1021
1022 if (like) {
1023 termValue = termValue.toLowerCase(queryParser.getLocale());
1024
1025 term = term.createTerm(
1026 StringPool.STAR.concat(termValue).concat(
1027 StringPool.STAR));
1028
1029 query = new WildcardQuery(term);
1030 }
1031 else {
1032 query = new TermQuery(term);
1033 }
1034
1035 query.setBoost(boost);
1036
1037 boolean included = false;
1038
1039 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1040 if (query.equals(booleanClause.getQuery())) {
1041 included = true;
1042 }
1043 }
1044
1045 if (!included) {
1046 booleanQuery.add(query, occur);
1047 }
1048 }
1049 }
1050 else if (query instanceof BooleanQuery) {
1051 BooleanQuery curBooleanQuery = (BooleanQuery)query;
1052
1053 BooleanQuery containerBooleanQuery = new BooleanQuery();
1054
1055 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1056 _includeIfUnique(
1057 containerBooleanQuery, like, queryParser,
1058 booleanClause.getQuery(), booleanClause.getOccur());
1059 }
1060
1061 if (containerBooleanQuery.getClauses().length > 0) {
1062 booleanQuery.add(containerBooleanQuery, occur);
1063 }
1064 }
1065 else {
1066 boolean included = false;
1067
1068 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1069 if (query.equals(booleanClause.getQuery())) {
1070 included = true;
1071 }
1072 }
1073
1074 if (!included) {
1075 booleanQuery.add(query, occur);
1076 }
1077 }
1078 }
1079
1080 private void _loadIndexFromCluster(
1081 IndexAccessor indexAccessor, long localLastGeneration)
1082 throws SystemException {
1083
1084 List<Address> clusterNodeAddresses =
1085 ClusterExecutorUtil.getClusterNodeAddresses();
1086
1087 int clusterNodeAddressesCount = clusterNodeAddresses.size();
1088
1089 if (clusterNodeAddressesCount <= 1) {
1090 if (_log.isDebugEnabled()) {
1091 _log.debug(
1092 "Do not load indexes because there is either one portal " +
1093 "instance or no portal instances in the cluster");
1094 }
1095
1096 return;
1097 }
1098
1099 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1100 new MethodHandler(
1101 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1102 true);
1103
1104 FutureClusterResponses futureClusterResponses =
1105 ClusterExecutorUtil.execute(clusterRequest);
1106
1107 _handleFutureClusterResponses(
1108 futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1109 localLastGeneration);
1110 }
1111
1112 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1113 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1114
1115 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1116 GetterUtil.getInteger(
1117 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1118 BooleanQuery.getMaxClauseCount());
1119
1120 private static final String[] _KEYWORDS_LOWERCASE = {
1121 " and ", " not ", " or "
1122 };
1123
1124 private static final String[] _KEYWORDS_UPPERCASE = {
1125 " AND ", " NOT ", " OR "
1126 };
1127
1128 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1129
1130 private static MethodKey _createTokenMethodKey = new MethodKey(
1131 TransientTokenUtil.class, "createToken", long.class);
1132 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1133 LuceneHelperUtil.class, "getLastGeneration", long.class);
1134
1135 private Analyzer _analyzer;
1136 private Map<Long, IndexAccessor> _indexAccessors =
1137 new ConcurrentHashMap<Long, IndexAccessor>();
1138 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1139 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1140 private Version _version;
1141
1142 private static class ShutdownSyncJob implements Runnable {
1143
1144 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1145 _countDownLatch = countDownLatch;
1146 }
1147
1148 @Override
1149 public void run() {
1150 _countDownLatch.countDown();
1151
1152 try {
1153 synchronized (this) {
1154 wait();
1155 }
1156 }
1157 catch (InterruptedException ie) {
1158 }
1159 }
1160
1161 private final CountDownLatch _countDownLatch;
1162
1163 }
1164
1165 private class LoadIndexClusterEventListener
1166 implements ClusterEventListener {
1167
1168 @Override
1169 public void processClusterEvent(ClusterEvent clusterEvent) {
1170 ClusterEventType clusterEventType =
1171 clusterEvent.getClusterEventType();
1172
1173 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1174 return;
1175 }
1176
1177 List<Address> clusterNodeAddresses =
1178 ClusterExecutorUtil.getClusterNodeAddresses();
1179 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1180
1181 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1182 if (_log.isDebugEnabled()) {
1183 _log.debug(
1184 "Number of original cluster members is greater than " +
1185 "one");
1186 }
1187
1188 return;
1189 }
1190
1191 long[] companyIds = PortalInstances.getCompanyIds();
1192
1193 for (long companyId : companyIds) {
1194 loadIndexes(companyId);
1195 }
1196
1197 loadIndexes(CompanyConstants.SYSTEM);
1198 }
1199
1200 private void loadIndexes(long companyId) {
1201 long lastGeneration = getLastGeneration(companyId);
1202
1203 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1204 return;
1205 }
1206
1207 try {
1208 LuceneClusterUtil.loadIndexesFromCluster(companyId);
1209 }
1210 catch (Exception e) {
1211 _log.error(
1212 "Unable to load indexes for company " + companyId, e);
1213 }
1214 }
1215
1216 }
1217
1218 }