001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLink;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.Destination;
036 import com.liferay.portal.kernel.messaging.MessageBus;
037 import com.liferay.portal.kernel.messaging.MessageBusUtil;
038 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039 import com.liferay.portal.kernel.search.BooleanClauseOccur;
040 import com.liferay.portal.kernel.search.Field;
041 import com.liferay.portal.kernel.search.SearchEngineUtil;
042 import com.liferay.portal.kernel.util.ArrayUtil;
043 import com.liferay.portal.kernel.util.GetterUtil;
044 import com.liferay.portal.kernel.util.MethodHandler;
045 import com.liferay.portal.kernel.util.MethodKey;
046 import com.liferay.portal.kernel.util.ObjectValuePair;
047 import com.liferay.portal.kernel.util.PropsKeys;
048 import com.liferay.portal.kernel.util.StringBundler;
049 import com.liferay.portal.kernel.util.StringPool;
050 import com.liferay.portal.kernel.util.StringUtil;
051 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
052 import com.liferay.portal.kernel.util.Validator;
053 import com.liferay.portal.model.CompanyConstants;
054 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
055 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
056 import com.liferay.portal.security.auth.TransientTokenUtil;
057 import com.liferay.portal.util.PortalInstances;
058 import com.liferay.portal.util.PortalUtil;
059 import com.liferay.portal.util.PropsUtil;
060 import com.liferay.portal.util.PropsValues;
061
062 import java.io.IOException;
063 import java.io.InputStream;
064 import java.io.OutputStream;
065
066 import java.net.InetAddress;
067 import java.net.URL;
068 import java.net.URLConnection;
069
070 import java.util.HashSet;
071 import java.util.List;
072 import java.util.Map;
073 import java.util.Set;
074 import java.util.concurrent.BlockingQueue;
075 import java.util.concurrent.ConcurrentHashMap;
076 import java.util.concurrent.CountDownLatch;
077 import java.util.concurrent.TimeUnit;
078 import java.util.concurrent.TimeoutException;
079
080 import org.apache.commons.lang.time.StopWatch;
081 import org.apache.lucene.analysis.Analyzer;
082 import org.apache.lucene.analysis.TokenStream;
083 import org.apache.lucene.document.Document;
084 import org.apache.lucene.index.IndexReader;
085 import org.apache.lucene.index.Term;
086 import org.apache.lucene.queryParser.QueryParser;
087 import org.apache.lucene.search.BooleanClause;
088 import org.apache.lucene.search.BooleanQuery;
089 import org.apache.lucene.search.IndexSearcher;
090 import org.apache.lucene.search.NumericRangeQuery;
091 import org.apache.lucene.search.Query;
092 import org.apache.lucene.search.TermQuery;
093 import org.apache.lucene.search.TermRangeQuery;
094 import org.apache.lucene.search.WildcardQuery;
095 import org.apache.lucene.search.highlight.Formatter;
096 import org.apache.lucene.search.highlight.Highlighter;
097 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
098 import org.apache.lucene.search.highlight.QueryScorer;
099 import org.apache.lucene.search.highlight.SimpleFragmenter;
100 import org.apache.lucene.search.highlight.WeightedTerm;
101 import org.apache.lucene.util.Version;
102
103
112 public class LuceneHelperImpl implements LuceneHelper {
113
114 @Override
115 public void addDocument(long companyId, Document document)
116 throws IOException {
117
118 IndexAccessor indexAccessor = getIndexAccessor(companyId);
119
120 indexAccessor.addDocument(document);
121 }
122
123 @Override
124 public void addExactTerm(
125 BooleanQuery booleanQuery, String field, String value) {
126
127 addTerm(booleanQuery, field, value, false);
128 }
129
130 @Override
131 public void addNumericRangeTerm(
132 BooleanQuery booleanQuery, String field, Integer startValue,
133 Integer endValue) {
134
135 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
136 field, startValue, endValue, true, true);
137
138 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
139 }
140
141 @Override
142 public void addNumericRangeTerm(
143 BooleanQuery booleanQuery, String field, Long startValue,
144 Long endValue) {
145
146 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
147 field, startValue, endValue, true, true);
148
149 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
150 }
151
152
156 @Override
157 public void addNumericRangeTerm(
158 BooleanQuery booleanQuery, String field, String startValue,
159 String endValue) {
160
161 addNumericRangeTerm(
162 booleanQuery, field, GetterUtil.getLong(startValue),
163 GetterUtil.getLong(endValue));
164 }
165
166 @Override
167 public void addRangeTerm(
168 BooleanQuery booleanQuery, String field, String startValue,
169 String endValue) {
170
171 boolean includesLower = true;
172
173 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
174 includesLower = false;
175 }
176
177 boolean includesUpper = true;
178
179 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
180 includesUpper = false;
181 }
182
183 TermRangeQuery termRangeQuery = new TermRangeQuery(
184 field, startValue, endValue, includesLower, includesUpper);
185
186 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
187 }
188
189 @Override
190 public void addRequiredTerm(
191 BooleanQuery booleanQuery, String field, String value, boolean like) {
192
193 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
194 }
195
196 @Override
197 public void addRequiredTerm(
198 BooleanQuery booleanQuery, String field, String[] values,
199 boolean like) {
200
201 if (values == null) {
202 return;
203 }
204
205 BooleanQuery query = new BooleanQuery();
206
207 for (String value : values) {
208 addTerm(query, field, value, like);
209 }
210
211 booleanQuery.add(query, BooleanClause.Occur.MUST);
212 }
213
214 @Override
215 public void addTerm(
216 BooleanQuery booleanQuery, String field, String value, boolean like) {
217
218 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
219 }
220
221 @Override
222 public void addTerm(
223 BooleanQuery booleanQuery, String field, String value, boolean like,
224 BooleanClauseOccur booleanClauseOccur) {
225
226 if (Validator.isNull(value)) {
227 return;
228 }
229
230 Analyzer analyzer = getAnalyzer();
231
232 if (analyzer instanceof PerFieldAnalyzer) {
233 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
234
235 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
236
237 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
238 like = true;
239 }
240 }
241
242 if (like) {
243 value = StringUtil.replace(
244 value, StringPool.PERCENT, StringPool.BLANK);
245 }
246
247 try {
248 QueryParser queryParser = new QueryParser(
249 getVersion(), field, analyzer);
250
251 Query query = queryParser.parse(value);
252
253 BooleanClause.Occur occur = null;
254
255 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
256 occur = BooleanClause.Occur.MUST;
257 }
258 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
259 occur = BooleanClause.Occur.MUST_NOT;
260 }
261 else {
262 occur = BooleanClause.Occur.SHOULD;
263 }
264
265 _includeIfUnique(booleanQuery, like, queryParser, query, occur);
266 }
267 catch (Exception e) {
268 if (_log.isWarnEnabled()) {
269 _log.warn(e, e);
270 }
271 }
272 }
273
274 @Override
275 public void addTerm(
276 BooleanQuery booleanQuery, String field, String[] values,
277 boolean like) {
278
279 for (String value : values) {
280 addTerm(booleanQuery, field, value, like);
281 }
282 }
283
284
288 @Deprecated
289 @Override
290 public void cleanUp(IndexSearcher indexSearcher) {
291 if (indexSearcher == null) {
292 return;
293 }
294
295 try {
296 indexSearcher.close();
297
298 IndexReader indexReader = indexSearcher.getIndexReader();
299
300 if (indexReader != null) {
301 indexReader.close();
302 }
303 }
304 catch (IOException ioe) {
305 _log.error(ioe, ioe);
306 }
307 }
308
309 @Override
310 public int countScoredFieldNames(Query query, String[] filedNames) {
311 int count = 0;
312
313 for (String fieldName : filedNames) {
314 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
315 query, false, fieldName);
316
317 if ((weightedTerms.length > 0) &&
318 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
319
320 count++;
321 }
322 }
323
324 return count;
325 }
326
327 @Override
328 public void delete(long companyId) {
329 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
330
331 if (indexAccessor == null) {
332 return;
333 }
334
335 indexAccessor.delete();
336 }
337
338 @Override
339 public void deleteDocuments(long companyId, Term term) throws IOException {
340 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
341
342 if (indexAccessor == null) {
343 return;
344 }
345
346 indexAccessor.deleteDocuments(term);
347 }
348
349 @Override
350 public void dumpIndex(long companyId, OutputStream outputStream)
351 throws IOException {
352
353 long lastGeneration = getLastGeneration(companyId);
354
355 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
356 if (_log.isDebugEnabled()) {
357 _log.debug(
358 "Dump index from cluster is not enabled for " + companyId);
359 }
360
361 return;
362 }
363
364 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
365
366 if (indexAccessor == null) {
367 return;
368 }
369
370 indexAccessor.dumpIndex(outputStream);
371 }
372
373 @Override
374 public Analyzer getAnalyzer() {
375 return _analyzer;
376 }
377
378 @Override
379 public IndexAccessor getIndexAccessor(long companyId) {
380 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
381
382 if (indexAccessor != null) {
383 return indexAccessor;
384 }
385
386 synchronized (this) {
387 indexAccessor = _indexAccessors.get(companyId);
388
389 if (indexAccessor == null) {
390 indexAccessor = new IndexAccessorImpl(companyId);
391
392 if (isLoadIndexFromClusterEnabled()) {
393 indexAccessor = new SynchronizedIndexAccessorImpl(
394 indexAccessor);
395
396 boolean clusterForwardMessage = GetterUtil.getBoolean(
397 MessageValuesThreadLocal.getValue(
398 ClusterLink.CLUSTER_FORWARD_MESSAGE));
399
400 if (clusterForwardMessage) {
401 if (_log.isInfoEnabled()) {
402 _log.info(
403 "Skip Luncene index files cluster loading " +
404 "since this is a manual reindex request");
405 }
406 }
407 else {
408 try {
409 _loadIndexFromCluster(
410 indexAccessor,
411 indexAccessor.getLastGeneration());
412 }
413 catch (Exception e) {
414 _log.error(
415 "Unable to load index for company " +
416 indexAccessor.getCompanyId(),
417 e);
418 }
419 }
420 }
421
422 _indexAccessors.put(companyId, indexAccessor);
423 }
424 }
425
426 return indexAccessor;
427 }
428
429 @Override
430 public IndexSearcher getIndexSearcher(long companyId) throws IOException {
431 IndexAccessor indexAccessor = getIndexAccessor(companyId);
432
433 return indexAccessor.acquireIndexSearcher();
434 }
435
436 @Override
437 public long getLastGeneration(long companyId) {
438 if (!isLoadIndexFromClusterEnabled()) {
439 return IndexAccessor.DEFAULT_LAST_GENERATION;
440 }
441
442 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
443
444 if (indexAccessor == null) {
445 return IndexAccessor.DEFAULT_LAST_GENERATION;
446 }
447
448 return indexAccessor.getLastGeneration();
449 }
450
451 @Override
452 public InputStream getLoadIndexesInputStreamFromCluster(
453 long companyId, Address bootupAddress)
454 throws SystemException {
455
456 if (!isLoadIndexFromClusterEnabled()) {
457 return null;
458 }
459
460 InputStream inputStream = null;
461
462 try {
463 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
464 _getBootupClusterNodeObjectValuePair(bootupAddress);
465
466 URL url = bootupClusterNodeObjectValuePair.getValue();
467
468 URLConnection urlConnection = url.openConnection();
469
470 urlConnection.setDoOutput(true);
471
472 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
473 urlConnection.getOutputStream());
474
475 unsyncPrintWriter.write("transientToken=");
476 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
477 unsyncPrintWriter.write("&companyId=");
478 unsyncPrintWriter.write(String.valueOf(companyId));
479
480 unsyncPrintWriter.close();
481
482 inputStream = urlConnection.getInputStream();
483
484 return inputStream;
485 }
486 catch (IOException ioe) {
487 throw new SystemException(ioe);
488 }
489 }
490
491 @Override
492 public Set<String> getQueryTerms(Query query) {
493 String queryString = StringUtil.replace(
494 query.toString(), StringPool.STAR, StringPool.BLANK);
495
496 Query tempQuery = null;
497
498 try {
499 QueryParser queryParser = new QueryParser(
500 getVersion(), StringPool.BLANK, getAnalyzer());
501
502 tempQuery = queryParser.parse(queryString);
503 }
504 catch (Exception e) {
505 if (_log.isWarnEnabled()) {
506 _log.warn("Unable to parse " + queryString);
507 }
508
509 tempQuery = query;
510 }
511
512 WeightedTerm[] weightedTerms = null;
513
514 for (String fieldName : Field.KEYWORDS) {
515 weightedTerms = QueryTermExtractor.getTerms(
516 tempQuery, false, fieldName);
517
518 if (weightedTerms.length > 0) {
519 break;
520 }
521 }
522
523 Set<String> queryTerms = new HashSet<String>();
524
525 for (WeightedTerm weightedTerm : weightedTerms) {
526 queryTerms.add(weightedTerm.getTerm());
527 }
528
529 return queryTerms;
530 }
531
532
535 @Deprecated
536 @Override
537 public IndexSearcher getSearcher(long companyId, boolean readOnly)
538 throws IOException {
539
540 IndexAccessor indexAccessor = getIndexAccessor(companyId);
541
542 IndexReader indexReader = IndexReader.open(
543 indexAccessor.getLuceneDir(), readOnly);
544
545 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
546
547 indexSearcher.setDefaultFieldSortScoring(true, false);
548 indexSearcher.setSimilarity(new FieldWeightSimilarity());
549
550 return indexSearcher;
551 }
552
553 @Override
554 public String getSnippet(
555 Query query, String field, String s, int maxNumFragments,
556 int fragmentLength, String fragmentSuffix, Formatter formatter)
557 throws IOException {
558
559 QueryScorer queryScorer = new QueryScorer(query, field);
560
561 Highlighter highlighter = new Highlighter(formatter, queryScorer);
562
563 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
564
565 TokenStream tokenStream = getAnalyzer().tokenStream(
566 field, new UnsyncStringReader(s));
567
568 try {
569 String snippet = highlighter.getBestFragments(
570 tokenStream, s, maxNumFragments, fragmentSuffix);
571
572 if (Validator.isNotNull(snippet) &&
573 !StringUtil.endsWith(snippet, fragmentSuffix) &&
574 !s.equals(snippet)) {
575
576 snippet = snippet.concat(fragmentSuffix);
577 }
578
579 return snippet;
580 }
581 catch (InvalidTokenOffsetsException itoe) {
582 throw new IOException(itoe.getMessage());
583 }
584 }
585
586 @Override
587 public Version getVersion() {
588 return _version;
589 }
590
591 @Override
592 public boolean isLoadIndexFromClusterEnabled() {
593 if (PropsValues.CLUSTER_LINK_ENABLED &&
594 PropsValues.LUCENE_REPLICATE_WRITE) {
595
596 return true;
597 }
598
599 if (_log.isDebugEnabled()) {
600 _log.debug("Load index from cluster is not enabled");
601 }
602
603 return false;
604 }
605
606 @Override
607 public void loadIndex(long companyId, InputStream inputStream)
608 throws IOException {
609
610 if (!isLoadIndexFromClusterEnabled()) {
611 return;
612 }
613
614 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
615
616 if (indexAccessor == null) {
617 if (_log.isInfoEnabled()) {
618 _log.info(
619 "Skip loading Lucene index files for company " + companyId +
620 " in favor of lazy loading");
621 }
622
623 return;
624 }
625
626 StopWatch stopWatch = new StopWatch();
627
628 stopWatch.start();
629
630 if (_log.isInfoEnabled()) {
631 _log.info(
632 "Start loading Lucene index files for company " + companyId);
633 }
634
635 indexAccessor.loadIndex(inputStream);
636
637 if (_log.isInfoEnabled()) {
638 _log.info(
639 "Finished loading index files for company " + companyId +
640 " in " + stopWatch.getTime() + " ms");
641 }
642 }
643
644 @Override
645 public void loadIndexesFromCluster(long companyId) throws SystemException {
646 if (!isLoadIndexFromClusterEnabled()) {
647 return;
648 }
649
650 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
651
652 if (indexAccessor == null) {
653 return;
654 }
655
656 long localLastGeneration = getLastGeneration(companyId);
657
658 _loadIndexFromCluster(indexAccessor, localLastGeneration);
659 }
660
661 @Override
662 public void releaseIndexSearcher(
663 long companyId, IndexSearcher indexSearcher)
664 throws IOException {
665
666 IndexAccessor indexAccessor = getIndexAccessor(companyId);
667
668 indexAccessor.releaseIndexSearcher(indexSearcher);
669 }
670
671 public void setAnalyzer(Analyzer analyzer) {
672 _analyzer = analyzer;
673 }
674
675 public void setVersion(Version version) {
676 _version = version;
677 }
678
679 @Override
680 public void shutdown() {
681 if (_luceneIndexThreadPoolExecutor != null) {
682 _luceneIndexThreadPoolExecutor.shutdownNow();
683
684 try {
685 _luceneIndexThreadPoolExecutor.awaitTermination(
686 60, TimeUnit.SECONDS);
687 }
688 catch (InterruptedException ie) {
689 _log.error("Lucene indexer shutdown interrupted", ie);
690 }
691 }
692
693 if (isLoadIndexFromClusterEnabled()) {
694 ClusterExecutorUtil.removeClusterEventListener(
695 _loadIndexClusterEventListener);
696 }
697
698 MessageBus messageBus = MessageBusUtil.getMessageBus();
699
700 for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
701 String searchWriterDestinationName =
702 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
703
704 Destination searchWriteDestination = messageBus.getDestination(
705 searchWriterDestinationName);
706
707 if (searchWriteDestination != null) {
708 ThreadPoolExecutor threadPoolExecutor =
709 PortalExecutorManagerUtil.getPortalExecutor(
710 searchWriterDestinationName);
711
712 int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
713
714 CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
715
716 ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
717 countDownLatch);
718
719 for (int i = 0; i < maxPoolSize; i++) {
720 threadPoolExecutor.submit(shutdownSyncJob);
721 }
722
723 try {
724 countDownLatch.await();
725 }
726 catch (InterruptedException ie) {
727 _log.error("Shutdown waiting interrupted", ie);
728 }
729
730 List<Runnable> runnables = threadPoolExecutor.shutdownNow();
731
732 if (_log.isDebugEnabled()) {
733 _log.debug(
734 "Cancelled appending indexing jobs: " + runnables);
735 }
736
737 searchWriteDestination.close(true);
738 }
739 }
740
741 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
742 indexAccessor.close();
743 }
744 }
745
746 @Override
747 public void shutdown(long companyId) {
748 IndexAccessor indexAccessor = getIndexAccessor(companyId);
749
750 _indexAccessors.remove(indexAccessor);
751
752 indexAccessor.close();
753 }
754
755 @Override
756 public void startup(long companyId) {
757 if (!PropsValues.INDEX_ON_STARTUP) {
758 return;
759 }
760
761 if (_log.isInfoEnabled()) {
762 _log.info("Indexing Lucene on startup");
763 }
764
765 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
766
767 if (PropsValues.INDEX_WITH_THREAD) {
768 if (_luceneIndexThreadPoolExecutor == null) {
769
770
771
772
773
774 _luceneIndexThreadPoolExecutor =
775 PortalExecutorManagerUtil.getPortalExecutor(
776 LuceneHelperImpl.class.getName());
777 }
778
779 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
780 }
781 else {
782 luceneIndexer.reindex();
783 }
784 }
785
786 @Override
787 public void updateDocument(long companyId, Term term, Document document)
788 throws IOException {
789
790 IndexAccessor indexAccessor = getIndexAccessor(companyId);
791
792 indexAccessor.updateDocument(term, document);
793 }
794
795 private LuceneHelperImpl() {
796 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
797 _luceneIndexThreadPoolExecutor =
798 PortalExecutorManagerUtil.getPortalExecutor(
799 LuceneHelperImpl.class.getName());
800 }
801
802 if (isLoadIndexFromClusterEnabled()) {
803 _loadIndexClusterEventListener =
804 new LoadIndexClusterEventListener();
805
806 ClusterExecutorUtil.addClusterEventListener(
807 _loadIndexClusterEventListener);
808 }
809
810 BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
811 }
812
813 private ObjectValuePair<String, URL>
814 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
815 throws SystemException {
816
817 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
818 new MethodHandler(
819 _createTokenMethodKey,
820 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
821 bootupAddress);
822
823 FutureClusterResponses futureClusterResponses =
824 ClusterExecutorUtil.execute(clusterRequest);
825
826 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
827 futureClusterResponses.getPartialResults();
828
829 try {
830 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
831 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
832 TimeUnit.MILLISECONDS);
833
834 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
835
836 int port = clusterNode.getPort();
837
838 if (port <= 0) {
839 StringBundler sb = new StringBundler(6);
840
841 sb.append("Invalid cluster node port ");
842 sb.append(port);
843 sb.append(". The port is set by the first request or ");
844 sb.append("configured in portal.properties by the properties ");
845 sb.append("\"portal.instance.http.port\" and ");
846 sb.append("\"portal.instance.https.port\".");
847
848 throw new Exception(sb.toString());
849 }
850
851 String protocol = clusterNode.getPortalProtocol();
852
853 if (Validator.isNull(protocol)) {
854 StringBundler sb = new StringBundler(4);
855
856 sb.append("Cluster node protocol is empty. The protocol is ");
857 sb.append("set by the first request or configured in ");
858 sb.append("portal.properties by the property ");
859 sb.append("\"portal.instance.protocol\"");
860
861 throw new Exception(sb.toString());
862 }
863
864 InetAddress inetAddress = clusterNode.getInetAddress();
865
866 String fileName = PortalUtil.getPathContext();
867
868 if (!fileName.endsWith(StringPool.SLASH)) {
869 fileName = fileName.concat(StringPool.SLASH);
870 }
871
872 fileName = fileName.concat("lucene/dump");
873
874 URL url = new URL(
875 protocol, inetAddress.getHostAddress(), port, fileName);
876
877 String transientToken = (String)clusterNodeResponse.getResult();
878
879 return new ObjectValuePair<String, URL>(transientToken, url);
880 }
881 catch (Exception e) {
882 throw new SystemException(e);
883 }
884 }
885
886 private void _includeIfUnique(
887 BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
888 Query query, BooleanClause.Occur occur) {
889
890 if (query instanceof TermQuery) {
891 Set<Term> terms = new HashSet<Term>();
892
893 TermQuery termQuery = (TermQuery)query;
894
895 termQuery.extractTerms(terms);
896
897 float boost = termQuery.getBoost();
898
899 for (Term term : terms) {
900 String termValue = term.text();
901
902 if (like) {
903 termValue = termValue.toLowerCase(queryParser.getLocale());
904
905 term = term.createTerm(
906 StringPool.STAR.concat(termValue).concat(
907 StringPool.STAR));
908
909 query = new WildcardQuery(term);
910 }
911 else {
912 query = new TermQuery(term);
913 }
914
915 query.setBoost(boost);
916
917 boolean included = false;
918
919 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
920 if (query.equals(booleanClause.getQuery())) {
921 included = true;
922 }
923 }
924
925 if (!included) {
926 booleanQuery.add(query, occur);
927 }
928 }
929 }
930 else if (query instanceof BooleanQuery) {
931 BooleanQuery curBooleanQuery = (BooleanQuery)query;
932
933 BooleanQuery containerBooleanQuery = new BooleanQuery();
934
935 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
936 _includeIfUnique(
937 containerBooleanQuery, like, queryParser,
938 booleanClause.getQuery(), booleanClause.getOccur());
939 }
940
941 if (containerBooleanQuery.getClauses().length > 0) {
942 booleanQuery.add(containerBooleanQuery, occur);
943 }
944 }
945 else {
946 boolean included = false;
947
948 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
949 if (query.equals(booleanClause.getQuery())) {
950 included = true;
951 }
952 }
953
954 if (!included) {
955 booleanQuery.add(query, occur);
956 }
957 }
958 }
959
960 private void _loadIndexFromCluster(
961 IndexAccessor indexAccessor, long localLastGeneration)
962 throws SystemException {
963
964 List<Address> clusterNodeAddresses =
965 ClusterExecutorUtil.getClusterNodeAddresses();
966
967 int clusterNodeAddressesCount = clusterNodeAddresses.size();
968
969 if (clusterNodeAddressesCount <= 1) {
970 if (_log.isDebugEnabled()) {
971 _log.debug(
972 "Do not load indexes because there is either one portal " +
973 "instance or no portal instances in the cluster");
974 }
975
976 return;
977 }
978
979 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
980 new MethodHandler(
981 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
982 true);
983
984 ClusterExecutorUtil.execute(
985 clusterRequest,
986 new LoadIndexClusterResponseCallback(
987 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
988 }
989
990 private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
991 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
992
993 private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
994 GetterUtil.getInteger(
995 PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
996 BooleanQuery.getMaxClauseCount());
997
998 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
999
1000 private static MethodKey _createTokenMethodKey = new MethodKey(
1001 TransientTokenUtil.class, "createToken", long.class);
1002 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1003 LuceneHelperUtil.class, "getLastGeneration", long.class);
1004
1005 private Analyzer _analyzer;
1006 private Map<Long, IndexAccessor> _indexAccessors =
1007 new ConcurrentHashMap<Long, IndexAccessor>();
1008 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1009 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1010 private Version _version;
1011
1012 private static class ShutdownSyncJob implements Runnable {
1013
1014 public ShutdownSyncJob(CountDownLatch countDownLatch) {
1015 _countDownLatch = countDownLatch;
1016 }
1017
1018 @Override
1019 public void run() {
1020 _countDownLatch.countDown();
1021
1022 try {
1023 synchronized (this) {
1024 wait();
1025 }
1026 }
1027 catch (InterruptedException ie) {
1028 }
1029 }
1030
1031 private final CountDownLatch _countDownLatch;
1032
1033 }
1034
1035 private class LoadIndexClusterEventListener
1036 implements ClusterEventListener {
1037
1038 @Override
1039 public void processClusterEvent(ClusterEvent clusterEvent) {
1040 ClusterEventType clusterEventType =
1041 clusterEvent.getClusterEventType();
1042
1043 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1044 return;
1045 }
1046
1047 List<Address> clusterNodeAddresses =
1048 ClusterExecutorUtil.getClusterNodeAddresses();
1049 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1050
1051 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1052 if (_log.isDebugEnabled()) {
1053 _log.debug(
1054 "Number of original cluster members is greater than " +
1055 "one");
1056 }
1057
1058 return;
1059 }
1060
1061 long[] companyIds = PortalInstances.getCompanyIds();
1062
1063 for (long companyId : companyIds) {
1064 loadIndexes(companyId);
1065 }
1066
1067 loadIndexes(CompanyConstants.SYSTEM);
1068 }
1069
1070 private void loadIndexes(long companyId) {
1071 long lastGeneration = getLastGeneration(companyId);
1072
1073 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1074 return;
1075 }
1076
1077 try {
1078 LuceneClusterUtil.loadIndexesFromCluster(companyId);
1079 }
1080 catch (Exception e) {
1081 _log.error(
1082 "Unable to load indexes for company " + companyId, e);
1083 }
1084 }
1085
1086 }
1087
1088 private class LoadIndexClusterResponseCallback
1089 extends BaseClusterResponseCallback {
1090
1091 public LoadIndexClusterResponseCallback(
1092 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1093 long localLastGeneration) {
1094
1095 _indexAccessor = indexAccessor;
1096 _clusterNodeAddressesCount = clusterNodeAddressesCount;
1097 _localLastGeneration = localLastGeneration;
1098
1099 _companyId = _indexAccessor.getCompanyId();
1100 }
1101
1102 @Override
1103 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1104 Address bootupAddress = null;
1105
1106 do {
1107 _clusterNodeAddressesCount--;
1108
1109 ClusterNodeResponse clusterNodeResponse = null;
1110
1111 try {
1112 clusterNodeResponse = blockingQueue.poll(
1113 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1114 TimeUnit.MILLISECONDS);
1115 }
1116 catch (Exception e) {
1117 _log.error("Unable to get cluster node response", e);
1118 }
1119
1120 if (clusterNodeResponse == null) {
1121 if (_log.isDebugEnabled()) {
1122 _log.debug(
1123 "Unable to get cluster node response in " +
1124 _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1125 TimeUnit.MILLISECONDS);
1126 }
1127
1128 continue;
1129 }
1130
1131 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1132
1133 if (clusterNode.getPort() > 0) {
1134 try {
1135 long remoteLastGeneration =
1136 (Long)clusterNodeResponse.getResult();
1137
1138 if (remoteLastGeneration > _localLastGeneration) {
1139 bootupAddress = clusterNodeResponse.getAddress();
1140
1141 break;
1142 }
1143 }
1144 catch (Exception e) {
1145 if (_log.isDebugEnabled()) {
1146 _log.debug(
1147 "Suppress exception caused by remote method " +
1148 "invocation",
1149 e);
1150 }
1151
1152 continue;
1153 }
1154 }
1155 else {
1156 if (_log.isDebugEnabled()) {
1157 _log.debug(
1158 "Cluster node " + clusterNode +
1159 " has invalid port");
1160 }
1161 }
1162 }
1163 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1164
1165 if (bootupAddress == null) {
1166 return;
1167 }
1168
1169 if (_log.isInfoEnabled()) {
1170 _log.info(
1171 "Start loading lucene index files from cluster node " +
1172 bootupAddress);
1173 }
1174
1175 InputStream inputStream = null;
1176
1177 try {
1178 inputStream = getLoadIndexesInputStreamFromCluster(
1179 _companyId, bootupAddress);
1180
1181 _indexAccessor.loadIndex(inputStream);
1182
1183 if (_log.isInfoEnabled()) {
1184 _log.info("Lucene index files loaded successfully");
1185 }
1186 }
1187 catch (Exception e) {
1188 _log.error("Unable to load index for company " + _companyId, e);
1189 }
1190 finally {
1191 if (inputStream != null) {
1192 try {
1193 inputStream.close();
1194 }
1195 catch (IOException ioe) {
1196 _log.error(
1197 "Unable to close input stream for company " +
1198 _companyId,
1199 ioe);
1200 }
1201 }
1202 }
1203 }
1204
1205 @Override
1206 public void processTimeoutException(TimeoutException timeoutException) {
1207 _log.error(
1208 "Unable to load index for company " + _companyId,
1209 timeoutException);
1210 }
1211
1212 private int _clusterNodeAddressesCount;
1213 private long _companyId;
1214 private IndexAccessor _indexAccessor;
1215 private long _localLastGeneration;
1216
1217 }
1218
1219 }