001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLink;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.Destination;
036 import com.liferay.portal.kernel.messaging.MessageBus;
037 import com.liferay.portal.kernel.messaging.MessageBusUtil;
038 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039 import com.liferay.portal.kernel.search.BooleanClauseOccur;
040 import com.liferay.portal.kernel.search.Field;
041 import com.liferay.portal.kernel.search.SearchEngineUtil;
042 import com.liferay.portal.kernel.util.ArrayUtil;
043 import com.liferay.portal.kernel.util.GetterUtil;
044 import com.liferay.portal.kernel.util.Http;
045 import com.liferay.portal.kernel.util.MethodHandler;
046 import com.liferay.portal.kernel.util.MethodKey;
047 import com.liferay.portal.kernel.util.ObjectValuePair;
048 import com.liferay.portal.kernel.util.PropsKeys;
049 import com.liferay.portal.kernel.util.StringBundler;
050 import com.liferay.portal.kernel.util.StringPool;
051 import com.liferay.portal.kernel.util.StringUtil;
052 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053 import com.liferay.portal.kernel.util.Validator;
054 import com.liferay.portal.model.CompanyConstants;
055 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
056 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
057 import com.liferay.portal.security.auth.TransientTokenUtil;
058 import com.liferay.portal.util.PortalInstances;
059 import com.liferay.portal.util.PortalUtil;
060 import com.liferay.portal.util.PropsUtil;
061 import com.liferay.portal.util.PropsValues;
062
063 import java.io.IOException;
064 import java.io.InputStream;
065 import java.io.OutputStream;
066
067 import java.net.InetAddress;
068 import java.net.URL;
069 import java.net.URLConnection;
070
071 import java.util.HashSet;
072 import java.util.List;
073 import java.util.Map;
074 import java.util.Set;
075 import java.util.concurrent.BlockingQueue;
076 import java.util.concurrent.ConcurrentHashMap;
077 import java.util.concurrent.CountDownLatch;
078 import java.util.concurrent.TimeUnit;
079 import java.util.concurrent.TimeoutException;
080
081 import org.apache.commons.lang.time.StopWatch;
082 import org.apache.lucene.analysis.Analyzer;
083 import org.apache.lucene.analysis.TokenStream;
084 import org.apache.lucene.document.Document;
085 import org.apache.lucene.index.IndexReader;
086 import org.apache.lucene.index.Term;
087 import org.apache.lucene.queryParser.QueryParser;
088 import org.apache.lucene.search.BooleanClause;
089 import org.apache.lucene.search.BooleanQuery;
090 import org.apache.lucene.search.IndexSearcher;
091 import org.apache.lucene.search.NumericRangeQuery;
092 import org.apache.lucene.search.Query;
093 import org.apache.lucene.search.TermQuery;
094 import org.apache.lucene.search.TermRangeQuery;
095 import org.apache.lucene.search.WildcardQuery;
096 import org.apache.lucene.search.highlight.Formatter;
097 import org.apache.lucene.search.highlight.Highlighter;
098 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
099 import org.apache.lucene.search.highlight.QueryScorer;
100 import org.apache.lucene.search.highlight.SimpleFragmenter;
101 import org.apache.lucene.search.highlight.WeightedTerm;
102 import org.apache.lucene.util.Version;
103
104
113 public class LuceneHelperImpl implements LuceneHelper {
114
115 @Override
116 public void addDocument(long companyId, Document document)
117 throws IOException {
118
119 IndexAccessor indexAccessor = getIndexAccessor(companyId);
120
121 indexAccessor.addDocument(document);
122 }
123
124 @Override
125 public void addExactTerm(
126 BooleanQuery booleanQuery, String field, String value) {
127
128 addTerm(booleanQuery, field, value, false);
129 }
130
131 @Override
132 public void addNumericRangeTerm(
133 BooleanQuery booleanQuery, String field, Integer startValue,
134 Integer endValue) {
135
136 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
137 field, startValue, endValue, true, true);
138
139 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
140 }
141
142 @Override
143 public void addNumericRangeTerm(
144 BooleanQuery booleanQuery, String field, Long startValue,
145 Long endValue) {
146
147 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
148 field, startValue, endValue, true, true);
149
150 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
151 }
152
153
157 @Override
158 public void addNumericRangeTerm(
159 BooleanQuery booleanQuery, String field, String startValue,
160 String endValue) {
161
162 addNumericRangeTerm(
163 booleanQuery, field, GetterUtil.getLong(startValue),
164 GetterUtil.getLong(endValue));
165 }
166
167 @Override
168 public void addRangeTerm(
169 BooleanQuery booleanQuery, String field, String startValue,
170 String endValue) {
171
172 boolean includesLower = true;
173
174 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
175 includesLower = false;
176 }
177
178 boolean includesUpper = true;
179
180 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
181 includesUpper = false;
182 }
183
184 TermRangeQuery termRangeQuery = new TermRangeQuery(
185 field, startValue, endValue, includesLower, includesUpper);
186
187 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
188 }
189
190 @Override
191 public void addRequiredTerm(
192 BooleanQuery booleanQuery, String field, String value, boolean like) {
193
194 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
195 }
196
197 @Override
198 public void addRequiredTerm(
199 BooleanQuery booleanQuery, String field, String[] values,
200 boolean like) {
201
202 if (values == null) {
203 return;
204 }
205
206 BooleanQuery query = new BooleanQuery();
207
208 for (String value : values) {
209 addTerm(query, field, value, like);
210 }
211
212 booleanQuery.add(query, BooleanClause.Occur.MUST);
213 }
214
215 @Override
216 public void addTerm(
217 BooleanQuery booleanQuery, String field, String value, boolean like) {
218
219 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
220 }
221
222 @Override
223 public void addTerm(
224 BooleanQuery booleanQuery, String field, String value, boolean like,
225 BooleanClauseOccur booleanClauseOccur) {
226
227 if (Validator.isNull(value)) {
228 return;
229 }
230
231 Analyzer analyzer = getAnalyzer();
232
233 if (analyzer instanceof PerFieldAnalyzer) {
234 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
235
236 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
237
238 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
239 like = true;
240 }
241 }
242
243 if (like) {
244 value = StringUtil.replace(
245 value, StringPool.PERCENT, StringPool.BLANK);
246 }
247
248 try {
249 QueryParser queryParser = new QueryParser(
250 getVersion(), field, analyzer);
251
252 Query query = queryParser.parse(value);
253
254 BooleanClause.Occur occur = null;
255
256 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
257 occur = BooleanClause.Occur.MUST;
258 }
259 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
260 occur = BooleanClause.Occur.MUST_NOT;
261 }
262 else {
263 occur = BooleanClause.Occur.SHOULD;
264 }
265
266 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
267 }
268 catch (Exception e) {
269 if (_log.isWarnEnabled()) {
270 _log.warn(e, e);
271 }
272 }
273 }
274
275 @Override
276 public void addTerm(
277 BooleanQuery booleanQuery, String field, String[] values,
278 boolean like) {
279
280 for (String value : values) {
281 addTerm(booleanQuery, field, value, like);
282 }
283 }
284
285
289 @Deprecated
290 @Override
291 public void cleanUp(IndexSearcher indexSearcher) {
292 if (indexSearcher == null) {
293 return;
294 }
295
296 try {
297 indexSearcher.close();
298
299 IndexReader indexReader = indexSearcher.getIndexReader();
300
301 if (indexReader != null) {
302 indexReader.close();
303 }
304 }
305 catch (IOException ioe) {
306 _log.error(ioe, ioe);
307 }
308 }
309
310 @Override
311 public int countScoredFieldNames(Query query, String[] filedNames) {
312 int count = 0;
313
314 for (String fieldName : filedNames) {
315 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
316 query, false, fieldName);
317
318 if ((weightedTerms.length > 0) &&
319 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
320
321 count++;
322 }
323 }
324
325 return count;
326 }
327
328 @Override
329 public void delete(long companyId) {
330 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
331
332 if (indexAccessor == null) {
333 return;
334 }
335
336 indexAccessor.delete();
337 }
338
339 @Override
340 public void deleteDocuments(long companyId, Term term) throws IOException {
341 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
342
343 if (indexAccessor == null) {
344 return;
345 }
346
347 indexAccessor.deleteDocuments(term);
348 }
349
350 @Override
351 public void dumpIndex(long companyId, OutputStream outputStream)
352 throws IOException {
353
354 long lastGeneration = getLastGeneration(companyId);
355
356 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
357 if (_log.isDebugEnabled()) {
358 _log.debug(
359 "Dump index from cluster is not enabled for " + companyId);
360 }
361
362 return;
363 }
364
365 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
366
367 if (indexAccessor == null) {
368 return;
369 }
370
371 indexAccessor.dumpIndex(outputStream);
372 }
373
374 @Override
375 public Analyzer getAnalyzer() {
376 return _analyzer;
377 }
378
379 @Override
380 public IndexAccessor getIndexAccessor(long companyId) {
381 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
382
383 if (indexAccessor != null) {
384 return indexAccessor;
385 }
386
387 synchronized (this) {
388 indexAccessor = _indexAccessors.get(companyId);
389
390 if (indexAccessor == null) {
391 indexAccessor = new IndexAccessorImpl(companyId);
392
393 if (isLoadIndexFromClusterEnabled()) {
394 indexAccessor = new SynchronizedIndexAccessorImpl(
395 indexAccessor);
396
397 boolean clusterForwardMessage = GetterUtil.getBoolean(
398 MessageValuesThreadLocal.getValue(
399 ClusterLink.CLUSTER_FORWARD_MESSAGE));
400
401 if (clusterForwardMessage) {
402 if (_log.isInfoEnabled()) {
403 _log.info(
404 "Skip Luncene index files cluster loading " +
405 "since this is a manual reindex request");
406 }
407 }
408 else {
409 try {
410 _loadIndexFromCluster(
411 indexAccessor,
412 indexAccessor.getLastGeneration());
413 }
414 catch (Exception e) {
415 _log.error(
416 "Unable to load index for company " +
417 indexAccessor.getCompanyId(),
418 e);
419 }
420 }
421 }
422
423 _indexAccessors.put(companyId, indexAccessor);
424 }
425 }
426
427 return indexAccessor;
428 }
429
430 @Override
431 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
432 IndexAccessor indexAccessor = getIndexAccessor(companyId);
433
434 return indexAccessor.acquireIndexSearcher();
435 }
436
437 @Override
438 public long getLastGeneration(long companyId) {
439 if (!isLoadIndexFromClusterEnabled()) {
440 return IndexAccessor.DEFAULT_LAST_GENERATION;
441 }
442
443 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
444
445 if (indexAccessor == null) {
446 return IndexAccessor.DEFAULT_LAST_GENERATION;
447 }
448
449 return indexAccessor.getLastGeneration();
450 }
451
452 @Override
453 public InputStream getLoadIndexesInputStreamFromCluster(
454 long companyId, Address bootupAddress)
455 throws SystemException {
456
457 if (!isLoadIndexFromClusterEnabled()) {
458 return null;
459 }
460
461 InputStream inputStream = null;
462
463 try {
464 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
465 _getBootupClusterNodeObjectValuePair(bootupAddress);
466
467 URL url = bootupClusterNodeObjectValuePair.getValue();
468
469 URLConnection urlConnection = url.openConnection();
470
471 urlConnection.setDoOutput(true);
472
473 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
474 urlConnection.getOutputStream());
475
476 unsyncPrintWriter.write("transientToken=");
477 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
478 unsyncPrintWriter.write("&companyId=");
479 unsyncPrintWriter.write(String.valueOf(companyId));
480
481 unsyncPrintWriter.close();
482
483 inputStream = urlConnection.getInputStream();
484
485 return inputStream;
486 }
487 catch (IOException ioe) {
488 throw new SystemException(ioe);
489 }
490 }
491
492 @Override
493 public Set<String> getQueryTerms(Query query) {
494 String queryString = StringUtil.replace(
495 query.toString(), StringPool.STAR, StringPool.BLANK);
496
497 Query tempQuery = null;
498
499 try {
500 QueryParser queryParser = new QueryParser(
501 getVersion(), StringPool.BLANK, getAnalyzer());
502
503 tempQuery = queryParser.parse(queryString);
504 }
505 catch (Exception e) {
506 if (_log.isWarnEnabled()) {
507 _log.warn("Unable to parse " + queryString);
508 }
509
510 tempQuery = query;
511 }
512
513 WeightedTerm[] weightedTerms = null;
514
515 for (String fieldName : Field.KEYWORDS) {
516 weightedTerms = QueryTermExtractor.getTerms(
517 tempQuery, false, fieldName);
518
519 if (weightedTerms.length > 0) {
520 break;
521 }
522 }
523
524 Set<String> queryTerms = new HashSet<String>();
525
526 for (WeightedTerm weightedTerm : weightedTerms) {
527 queryTerms.add(weightedTerm.getTerm());
528 }
529
530 return queryTerms;
531 }
532
533
536 @Deprecated
537 @Override
538 public IndexSearcher getSearcher(long companyId, boolean readOnly)
539 throws IOException {
540
541 IndexAccessor indexAccessor = getIndexAccessor(companyId);
542
543 IndexReader indexReader = IndexReader.open(
544 indexAccessor.getLuceneDir(), readOnly);
545
546 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
547
548 indexSearcher.setDefaultFieldSortScoring(true, false);
549 indexSearcher.setSimilarity(new FieldWeightSimilarity());
550
551 return indexSearcher;
552 }
553
554 @Override
555 public String getSnippet(
556 Query query, String field, String s, int maxNumFragments,
557 int fragmentLength, String fragmentSuffix, Formatter formatter)
558 throws IOException {
559
560 QueryScorer queryScorer = new QueryScorer(query, field);
561
562 Highlighter highlighter = new Highlighter(formatter, queryScorer);
563
564 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
565
566 TokenStream tokenStream = getAnalyzer().tokenStream(
567 field, new UnsyncStringReader(s));
568
569 try {
570 String snippet = highlighter.getBestFragments(
571 tokenStream, s, maxNumFragments, fragmentSuffix);
572
573 if (Validator.isNotNull(snippet) &&
574 !StringUtil.endsWith(snippet, fragmentSuffix) &&
575 !s.equals(snippet)) {
576
577 snippet = snippet.concat(fragmentSuffix);
578 }
579
580 return snippet;
581 }
582 catch (InvalidTokenOffsetsException itoe) {
583 throw new IOException(itoe.getMessage());
584 }
585 }
586
587 @Override
588 public Version getVersion() {
589 return _version;
590 }
591
592 @Override
593 public boolean isLoadIndexFromClusterEnabled() {
594 if (PropsValues.CLUSTER_LINK_ENABLED &&
595 PropsValues.LUCENE_REPLICATE_WRITE) {
596
597 return true;
598 }
599
600 if (_log.isDebugEnabled()) {
601 _log.debug("Load index from cluster is not enabled");
602 }
603
604 return false;
605 }
606
607 @Override
608 public void loadIndex(long companyId, InputStream inputStream)
609 throws IOException {
610
611 if (!isLoadIndexFromClusterEnabled()) {
612 return;
613 }
614
615 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
616
617 if (indexAccessor == null) {
618 if (_log.isInfoEnabled()) {
619 _log.info(
620 "Skip loading Lucene index files for company " + companyId +
621 " in favor of lazy loading");
622 }
623
624 return;
625 }
626
627 StopWatch stopWatch = new StopWatch();
628
629 stopWatch.start();
630
631 if (_log.isInfoEnabled()) {
632 _log.info(
633 "Start loading Lucene index files for company " + companyId);
634 }
635
636 indexAccessor.loadIndex(inputStream);
637
638 if (_log.isInfoEnabled()) {
639 _log.info(
640 "Finished loading index files for company " + companyId +
641 " in " + stopWatch.getTime() + " ms");
642 }
643 }
644
645 @Override
646 public void loadIndexesFromCluster(long companyId) throws SystemException {
647 if (!isLoadIndexFromClusterEnabled()) {
648 return;
649 }
650
651 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
652
653 if (indexAccessor == null) {
654 return;
655 }
656
657 long localLastGeneration = getLastGeneration(companyId);
658
659 _loadIndexFromCluster(indexAccessor, localLastGeneration);
660 }
661
662 @Override
663 public void releaseIndexSearcher(
664 long companyId, IndexSearcher indexSearcher)
665 throws IOException {
666
667 IndexAccessor indexAccessor = getIndexAccessor(companyId);
668
669 indexAccessor.releaseIndexSearcher(indexSearcher);
670 }
671
672 public void setAnalyzer(Analyzer analyzer) {
673 _analyzer = analyzer;
674 }
675
676 public void setVersion(Version version) {
677 _version = version;
678 }
679
680 @Override
681 public void shutdown() {
682 if (_luceneIndexThreadPoolExecutor != null) {
683 _luceneIndexThreadPoolExecutor.shutdownNow();
684
685 try {
686 _luceneIndexThreadPoolExecutor.awaitTermination(
687 60, TimeUnit.SECONDS);
688 }
689 catch (InterruptedException ie) {
690 _log.error("Lucene indexer shutdown interrupted", ie);
691 }
692 }
693
694 if (isLoadIndexFromClusterEnabled()) {
695 ClusterExecutorUtil.removeClusterEventListener(
696 _loadIndexClusterEventListener);
697 }
698
699 MessageBus messageBus = MessageBusUtil.getMessageBus();
700
701 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
702 String searchWriterDestinationName =
703 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
704
705 Destination searchWriteDestination = messageBus.getDestination(
706 searchWriterDestinationName);
707
708 if (searchWriteDestination != null) {
709 ThreadPoolExecutor threadPoolExecutor =
710 PortalExecutorManagerUtil.getPortalExecutor(
711 searchWriterDestinationName);
712
713 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
714
715 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
716
717 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
718 countDownLatch);
719
720 for (int i = 0; i < maxPoolSize; i++) {
721 threadPoolExecutor.submit(shutdownSyncJob);
722 }
723
724 try {
725 countDownLatch.await();
726 }
727 catch (InterruptedException ie) {
728 _log.error("Shutdown waiting interrupted", ie);
729 }
730
731 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
732
733 if (_log.isDebugEnabled()) {
734 _log.debug(
735 "Cancelled appending indexing jobs: " + runnables);
736 }
737
738 searchWriteDestination.close(true);
739 }
740 }
741
742 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
743 indexAccessor.close();
744 }
745 }
746
747 @Override
748 public void shutdown(long companyId) {
749 IndexAccessor indexAccessor = getIndexAccessor(companyId);
750
751 _indexAccessors.remove(indexAccessor);
752
753 indexAccessor.close();
754 }
755
756 @Override
757 public void startup(long companyId) {
758 if (!PropsValues.INDEX_ON_STARTUP) {
759 return;
760 }
761
762 if (_log.isInfoEnabled()) {
763 _log.info("Indexing Lucene on startup");
764 }
765
766 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
767
768 if (PropsValues.INDEX_WITH_THREAD) {
769 if (_luceneIndexThreadPoolExecutor == null) {
770
771
772
773
774
775 _luceneIndexThreadPoolExecutor =
776 PortalExecutorManagerUtil.getPortalExecutor(
777 LuceneHelperImpl.class.getName());
778 }
779
780 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
781 }
782 else {
783 luceneIndexer.reindex();
784 }
785 }
786
787 @Override
788 public void updateDocument(long companyId, Term term, Document document)
789 throws IOException {
790
791 IndexAccessor indexAccessor = getIndexAccessor(companyId);
792
793 indexAccessor.updateDocument(term, document);
794 }
795
796 private LuceneHelperImpl() {
797 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
798 _luceneIndexThreadPoolExecutor =
799 PortalExecutorManagerUtil.getPortalExecutor(
800 LuceneHelperImpl.class.getName());
801 }
802
803 if (isLoadIndexFromClusterEnabled()) {
804 _loadIndexClusterEventListener =
805 new LoadIndexClusterEventListener();
806
807 ClusterExecutorUtil.addClusterEventListener(
808 _loadIndexClusterEventListener);
809 }
810
811 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
812
813 if (StringUtil.equalsIgnoreCase(
814 Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL)) {
815
816 _protocol = Http.HTTPS;
817 }
818 else {
819 _protocol = Http.HTTP;
820 }
821 }
822
823 private ObjectValuePair<String, URL>
824 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
825 throws SystemException {
826
827 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
828 new MethodHandler(
829 _createTokenMethodKey,
830 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
831 bootupAddress);
832
833 FutureClusterResponses futureClusterResponses =
834 ClusterExecutorUtil.execute(clusterRequest);
835
836 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
837 futureClusterResponses.getPartialResults();
838
839 try {
840 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
841 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
842 TimeUnit.MILLISECONDS);
843
844 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
845
846 int port = clusterNode.getPort();
847
848 if (port <= 0) {
849 StringBundler sb = new StringBundler(6);
850
851 sb.append("Invalid cluster node port ");
852 sb.append(port);
853 sb.append(". The port is set by the first request or ");
854 sb.append("configured in portal.properties by the properties ");
855 sb.append("\"portal.instance.http.port\" and ");
856 sb.append("\"portal.instance.https.port\".");
857
858 throw new Exception(sb.toString());
859 }
860
861 InetAddress inetAddress = clusterNode.getInetAddress();
862
863 String fileName = PortalUtil.getPathContext();
864
865 if (!fileName.endsWith(StringPool.SLASH)) {
866 fileName = fileName.concat(StringPool.SLASH);
867 }
868
869 fileName = fileName.concat("lucene/dump");
870
871 URL url = new URL(
872 _protocol, inetAddress.getHostAddress(), port, fileName);
873
874 String transientToken = (String)clusterNodeResponse.getResult();
875
876 return new ObjectValuePair<String, URL>(transientToken, url);
877 }
878 catch (Exception e) {
879 throw new SystemException(e);
880 }
881 }
882
883 private void _includeIfUnique(
884 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
885 Query query, BooleanClause.Occur occur) {
886
887 if (query instanceof TermQuery) {
888 Set<Term> terms = new HashSet<Term>();
889
890 TermQuery termQuery = (TermQuery)query;
891
892 termQuery.extractTerms(terms);
893
894 float boost = termQuery.getBoost();
895
896 for (Term term : terms) {
897 String termValue = term.text();
898
899 if (like) {
900 termValue = termValue.toLowerCase(queryParser.getLocale());
901
902 term = term.createTerm(
903 StringPool.STAR.concat(termValue).concat(
904 StringPool.STAR));
905
906 query = new WildcardQuery(term);
907 }
908 else {
909 query = new TermQuery(term);
910 }
911
912 query.setBoost(boost);
913
914 boolean included = false;
915
916 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
917 if (query.equals(booleanClause.getQuery())) {
918 included = true;
919 }
920 }
921
922 if (!included) {
923 booleanQuery.add(query, occur);
924 }
925 }
926 }
927 else if (query instanceof BooleanQuery) {
928 BooleanQuery curBooleanQuery = (BooleanQuery)query;
929
930 BooleanQuery containerBooleanQuery = new BooleanQuery();
931
932 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
933 _includeIfUnique(
934 containerBooleanQuery, like, queryParser,
935 booleanClause.getQuery(), booleanClause.getOccur());
936 }
937
938 if (containerBooleanQuery.getClauses().length > 0) {
939 booleanQuery.add(containerBooleanQuery, occur);
940 }
941 }
942 else {
943 boolean included = false;
944
945 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
946 if (query.equals(booleanClause.getQuery())) {
947 included = true;
948 }
949 }
950
951 if (!included) {
952 booleanQuery.add(query, occur);
953 }
954 }
955 }
956
957 private void _loadIndexFromCluster(
958 IndexAccessor indexAccessor, long localLastGeneration)
959 throws SystemException {
960
961 List<Address> clusterNodeAddresses =
962 ClusterExecutorUtil.getClusterNodeAddresses();
963
964 int clusterNodeAddressesCount = clusterNodeAddresses.size();
965
966 if (clusterNodeAddressesCount <= 1) {
967 if (_log.isDebugEnabled()) {
968 _log.debug(
969 "Do not load indexes because there is either one portal " +
970 "instance or no portal instances in the cluster");
971 }
972
973 return;
974 }
975
976 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
977 new MethodHandler(
978 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
979 true);
980
981 ClusterExecutorUtil.execute(
982 clusterRequest,
983 new LoadIndexClusterResponseCallback(
984 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
985 }
986
987 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
988 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
989
990 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
991 GetterUtil.getInteger(
992 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
993 BooleanQuery.getMaxClauseCount());
994
995 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
996
997 private static MethodKey _createTokenMethodKey = new MethodKey(
998 TransientTokenUtil.class, "createToken", long.class);
999 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1000 LuceneHelperUtil.class, "getLastGeneration", long.class);
1001
1002 private Analyzer _analyzer;
1003 private Map<Long, IndexAccessor> _indexAccessors =
1004 new ConcurrentHashMap<Long, IndexAccessor>();
1005 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1006 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1007 private String _protocol;
1008 private Version _version;
1009
1010 private static class ShutdownSyncJob implements Runnable {
1011
1012 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1013 _countDownLatch = countDownLatch;
1014 }
1015
1016 @Override
1017 public void run() {
1018 _countDownLatch.countDown();
1019
1020 try {
1021 synchronized (this) {
1022 wait();
1023 }
1024 }
1025 catch (InterruptedException ie) {
1026 }
1027 }
1028
1029 private final CountDownLatch _countDownLatch;
1030
1031 }
1032
1033 private class LoadIndexClusterEventListener
1034 implements ClusterEventListener {
1035
1036 @Override
1037 public void processClusterEvent(ClusterEvent clusterEvent) {
1038 ClusterEventType clusterEventType =
1039 clusterEvent.getClusterEventType();
1040
1041 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1042 return;
1043 }
1044
1045 List<Address> clusterNodeAddresses =
1046 ClusterExecutorUtil.getClusterNodeAddresses();
1047 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1048
1049 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1050 if (_log.isDebugEnabled()) {
1051 _log.debug(
1052 "Number of original cluster members is greater than " +
1053 "one");
1054 }
1055
1056 return;
1057 }
1058
1059 long[] companyIds = PortalInstances.getCompanyIds();
1060
1061 for (long companyId : companyIds) {
1062 loadIndexes(companyId);
1063 }
1064
1065 loadIndexes(CompanyConstants.SYSTEM);
1066 }
1067
1068 private void loadIndexes(long companyId) {
1069 long lastGeneration = getLastGeneration(companyId);
1070
1071 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1072 return;
1073 }
1074
1075 try {
1076 LuceneClusterUtil.loadIndexesFromCluster(companyId);
1077 }
1078 catch (Exception e) {
1079 _log.error(
1080 "Unable to load indexes for company " + companyId, e);
1081 }
1082 }
1083
1084 }
1085
1086 private class LoadIndexClusterResponseCallback
1087 extends BaseClusterResponseCallback {
1088
1089 public LoadIndexClusterResponseCallback(
1090 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1091 long localLastGeneration) {
1092
1093 _indexAccessor = indexAccessor;
1094 _clusterNodeAddressesCount = clusterNodeAddressesCount;
1095 _localLastGeneration = localLastGeneration;
1096
1097 _companyId = _indexAccessor.getCompanyId();
1098 }
1099
1100 @Override
1101 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1102 Address bootupAddress = null;
1103
1104 do {
1105 _clusterNodeAddressesCount--;
1106
1107 ClusterNodeResponse clusterNodeResponse = null;
1108
1109 try {
1110 clusterNodeResponse = blockingQueue.poll(
1111 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1112 TimeUnit.MILLISECONDS);
1113 }
1114 catch (Exception e) {
1115 _log.error("Unable to get cluster node response", e);
1116 }
1117
1118 if (clusterNodeResponse == null) {
1119 if (_log.isDebugEnabled()) {
1120 _log.debug(
1121 "Unable to get cluster node response in " +
1122 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1123 TimeUnit.MILLISECONDS);
1124 }
1125
1126 continue;
1127 }
1128
1129 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1130
1131 if (clusterNode.getPort() > 0) {
1132 try {
1133 long remoteLastGeneration =
1134 (Long)clusterNodeResponse.getResult();
1135
1136 if (remoteLastGeneration > _localLastGeneration) {
1137 bootupAddress = clusterNodeResponse.getAddress();
1138
1139 break;
1140 }
1141 }
1142 catch (Exception e) {
1143 if (_log.isDebugEnabled()) {
1144 _log.debug(
1145 "Suppress exception caused by remote method " +
1146 "invocation",
1147 e);
1148 }
1149
1150 continue;
1151 }
1152 }
1153 else {
1154 if (_log.isDebugEnabled()) {
1155 _log.debug(
1156 "Cluster node " + clusterNode +
1157 " has invalid port");
1158 }
1159 }
1160 }
1161 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1162
1163 if (bootupAddress == null) {
1164 return;
1165 }
1166
1167 if (_log.isInfoEnabled()) {
1168 _log.info(
1169 "Start loading lucene index files from cluster node " +
1170 bootupAddress);
1171 }
1172
1173 InputStream inputStream = null;
1174
1175 try {
1176 inputStream = getLoadIndexesInputStreamFromCluster(
1177 _companyId, bootupAddress);
1178
1179 _indexAccessor.loadIndex(inputStream);
1180
1181 if (_log.isInfoEnabled()) {
1182 _log.info("Lucene index files loaded successfully");
1183 }
1184 }
1185 catch (Exception e) {
1186 _log.error("Unable to load index for company " + _companyId, e);
1187 }
1188 finally {
1189 if (inputStream != null) {
1190 try {
1191 inputStream.close();
1192 }
1193 catch (IOException ioe) {
1194 _log.error(
1195 "Unable to close input stream for company " +
1196 _companyId,
1197 ioe);
1198 }
1199 }
1200 }
1201 }
1202
1203 @Override
1204 public void processTimeoutException(TimeoutException timeoutException) {
1205 _log.error(
1206 "Unable to load index for company " + _companyId,
1207 timeoutException);
1208 }
1209
1210 private int _clusterNodeAddressesCount;
1211 private long _companyId;
1212 private IndexAccessor _indexAccessor;
1213 private long _localLastGeneration;
1214
1215 }
1216
1217 }