001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLink;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036 import com.liferay.portal.kernel.search.BooleanClauseOccur;
037 import com.liferay.portal.kernel.search.Field;
038 import com.liferay.portal.kernel.util.ArrayUtil;
039 import com.liferay.portal.kernel.util.GetterUtil;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.StringPool;
044 import com.liferay.portal.kernel.util.StringUtil;
045 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
046 import com.liferay.portal.kernel.util.Validator;
047 import com.liferay.portal.model.CompanyConstants;
048 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
049 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
050 import com.liferay.portal.security.auth.TransientTokenUtil;
051 import com.liferay.portal.util.PortalInstances;
052 import com.liferay.portal.util.PortalUtil;
053 import com.liferay.portal.util.PropsValues;
054 import com.liferay.util.lucene.KeywordsUtil;
055
056 import java.io.IOException;
057 import java.io.InputStream;
058 import java.io.OutputStream;
059
060 import java.net.InetAddress;
061 import java.net.URL;
062 import java.net.URLConnection;
063
064 import java.util.HashSet;
065 import java.util.List;
066 import java.util.Map;
067 import java.util.Set;
068 import java.util.concurrent.BlockingQueue;
069 import java.util.concurrent.ConcurrentHashMap;
070 import java.util.concurrent.TimeUnit;
071 import java.util.concurrent.TimeoutException;
072
073 import org.apache.commons.lang.time.StopWatch;
074 import org.apache.lucene.analysis.Analyzer;
075 import org.apache.lucene.analysis.TokenStream;
076 import org.apache.lucene.document.Document;
077 import org.apache.lucene.index.IndexReader;
078 import org.apache.lucene.index.Term;
079 import org.apache.lucene.queryParser.QueryParser;
080 import org.apache.lucene.search.BooleanClause;
081 import org.apache.lucene.search.BooleanQuery;
082 import org.apache.lucene.search.IndexSearcher;
083 import org.apache.lucene.search.NumericRangeQuery;
084 import org.apache.lucene.search.Query;
085 import org.apache.lucene.search.TermQuery;
086 import org.apache.lucene.search.TermRangeQuery;
087 import org.apache.lucene.search.WildcardQuery;
088 import org.apache.lucene.search.highlight.Formatter;
089 import org.apache.lucene.search.highlight.Highlighter;
090 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
091 import org.apache.lucene.search.highlight.QueryScorer;
092 import org.apache.lucene.search.highlight.SimpleFragmenter;
093 import org.apache.lucene.search.highlight.WeightedTerm;
094 import org.apache.lucene.util.Version;
095
096
104 public class LuceneHelperImpl implements LuceneHelper {
105
106 public void addDocument(long companyId, Document document)
107 throws IOException {
108
109 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
110
111 indexAccessor.addDocument(document);
112 }
113
114 public void addExactTerm(
115 BooleanQuery booleanQuery, String field, String value) {
116
117 addTerm(booleanQuery, field, value, false);
118 }
119
120 public void addNumericRangeTerm(
121 BooleanQuery booleanQuery, String field, String startValue,
122 String endValue) {
123
124 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
125 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
126 true, true);
127
128 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
129 }
130
131 public void addRangeTerm(
132 BooleanQuery booleanQuery, String field, String startValue,
133 String endValue) {
134
135 boolean includesLower = true;
136
137 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
138 includesLower = false;
139 }
140
141 boolean includesUpper = true;
142
143 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
144 includesUpper = false;
145 }
146
147 TermRangeQuery termRangeQuery = new TermRangeQuery(
148 field, startValue, endValue, includesLower, includesUpper);
149
150 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
151 }
152
153 public void addRequiredTerm(
154 BooleanQuery booleanQuery, String field, String value, boolean like) {
155
156 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
157 }
158
159 public void addRequiredTerm(
160 BooleanQuery booleanQuery, String field, String[] values,
161 boolean like) {
162
163 if (values == null) {
164 return;
165 }
166
167 BooleanQuery query = new BooleanQuery();
168
169 for (String value : values) {
170 addTerm(query, field, value, like);
171 }
172
173 booleanQuery.add(query, BooleanClause.Occur.MUST);
174 }
175
176 public void addTerm(
177 BooleanQuery booleanQuery, String field, String value, boolean like) {
178
179 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
180 }
181
182 public void addTerm(
183 BooleanQuery booleanQuery, String field, String value, boolean like,
184 BooleanClauseOccur booleanClauseOccur) {
185
186 if (Validator.isNull(value)) {
187 return;
188 }
189
190 Analyzer analyzer = getAnalyzer();
191
192 if (analyzer instanceof PerFieldAnalyzer) {
193 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
194
195 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
196
197 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
198 like = true;
199 }
200 }
201
202 if (like) {
203 value = StringUtil.replace(
204 value, StringPool.PERCENT, StringPool.BLANK);
205 }
206
207 try {
208 QueryParser queryParser = new QueryParser(
209 getVersion(), field, analyzer);
210
211 Query query = null;
212
213 try {
214 if (like) {
215
216
217
218 value = value.toLowerCase(queryParser.getLocale());
219 }
220
221 query = queryParser.parse(value);
222 }
223 catch (Exception e) {
224 query = queryParser.parse(KeywordsUtil.escape(value));
225 }
226
227 BooleanClause.Occur occur = null;
228
229 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
230 occur = BooleanClause.Occur.MUST;
231 }
232 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
233 occur = BooleanClause.Occur.MUST_NOT;
234 }
235 else {
236 occur = BooleanClause.Occur.SHOULD;
237 }
238
239 _includeIfUnique(booleanQuery, query, occur, like);
240 }
241 catch (Exception e) {
242 _log.error(e, e);
243 }
244 }
245
246 public void addTerm(
247 BooleanQuery booleanQuery, String field, String[] values,
248 boolean like) {
249
250 for (String value : values) {
251 addTerm(booleanQuery, field, value, like);
252 }
253 }
254
255 public int countScoredFieldNames(Query query, String[] filedNames) {
256 int count = 0;
257
258 for (String fieldName : filedNames) {
259 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
260 query, false, fieldName);
261
262 if ((weightedTerms.length > 0) &&
263 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
264
265 count++;
266 }
267 }
268
269 return count;
270 }
271
272 public void delete(long companyId) {
273 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
274
275 if (indexAccessor == null) {
276 return;
277 }
278
279 indexAccessor.delete();
280 }
281
282 public void deleteDocuments(long companyId, Term term) throws IOException {
283 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
284
285 if (indexAccessor == null) {
286 return;
287 }
288
289 indexAccessor.deleteDocuments(term);
290 }
291
292 public void dumpIndex(long companyId, OutputStream outputStream)
293 throws IOException {
294
295 long lastGeneration = getLastGeneration(companyId);
296
297 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
298 if (_log.isDebugEnabled()) {
299 _log.debug(
300 "Dump index from cluster is not enabled for " + companyId);
301 }
302
303 return;
304 }
305
306 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
307
308 if (indexAccessor == null) {
309 return;
310 }
311
312 indexAccessor.dumpIndex(outputStream);
313 }
314
315 public Analyzer getAnalyzer() {
316 return _analyzer;
317 }
318
319 public long getLastGeneration(long companyId) {
320 if (!isLoadIndexFromClusterEnabled()) {
321 return IndexAccessor.DEFAULT_LAST_GENERATION;
322 }
323
324 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
325
326 if (indexAccessor == null) {
327 return IndexAccessor.DEFAULT_LAST_GENERATION;
328 }
329
330 return indexAccessor.getLastGeneration();
331 }
332
333 public InputStream getLoadIndexesInputStreamFromCluster(
334 long companyId, Address bootupAddress)
335 throws SystemException {
336
337 if (!isLoadIndexFromClusterEnabled()) {
338 return null;
339 }
340
341 InputStream inputStream = null;
342
343 try {
344 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
345 _getBootupClusterNodeObjectValuePair(bootupAddress);
346
347 URL url = bootupClusterNodeObjectValuePair.getValue();
348
349 URLConnection urlConnection = url.openConnection();
350
351 urlConnection.setDoOutput(true);
352
353 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
354 urlConnection.getOutputStream());
355
356 unsyncPrintWriter.write("transientToken=");
357 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
358 unsyncPrintWriter.write("&companyId=");
359 unsyncPrintWriter.write(String.valueOf(companyId));
360
361 unsyncPrintWriter.close();
362
363 inputStream = urlConnection.getInputStream();
364
365 return inputStream;
366 }
367 catch (IOException ioe) {
368 throw new SystemException(ioe);
369 }
370 }
371
372 public Set<String> getQueryTerms(Query query) {
373 String queryString = StringUtil.replace(
374 query.toString(), StringPool.STAR, StringPool.BLANK);
375
376 Query tempQuery = null;
377
378 try {
379 QueryParser queryParser = new QueryParser(
380 getVersion(), StringPool.BLANK, getAnalyzer());
381
382 tempQuery = queryParser.parse(queryString);
383 }
384 catch (Exception e) {
385 if (_log.isWarnEnabled()) {
386 _log.warn("Unable to parse " + queryString);
387 }
388
389 tempQuery = query;
390 }
391
392 WeightedTerm[] weightedTerms = null;
393
394 for (String fieldName : Field.KEYWORDS) {
395 weightedTerms = QueryTermExtractor.getTerms(
396 tempQuery, false, fieldName);
397
398 if (weightedTerms.length > 0) {
399 break;
400 }
401 }
402
403 Set<String> queryTerms = new HashSet<String>();
404
405 for (WeightedTerm weightedTerm : weightedTerms) {
406 queryTerms.add(weightedTerm.getTerm());
407 }
408
409 return queryTerms;
410 }
411
412 public IndexSearcher getSearcher(long companyId, boolean readOnly)
413 throws IOException {
414
415 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
416
417 IndexReader indexReader = IndexReader.open(
418 indexAccessor.getLuceneDir(), readOnly);
419
420 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
421
422 indexSearcher.setDefaultFieldSortScoring(true, true);
423 indexSearcher.setSimilarity(new FieldWeightSimilarity());
424
425 return indexSearcher;
426 }
427
428 public String getSnippet(
429 Query query, String field, String s, int maxNumFragments,
430 int fragmentLength, String fragmentSuffix, Formatter formatter)
431 throws IOException {
432
433 QueryScorer queryScorer = new QueryScorer(query, field);
434
435 Highlighter highlighter = new Highlighter(formatter, queryScorer);
436
437 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
438
439 TokenStream tokenStream = getAnalyzer().tokenStream(
440 field, new UnsyncStringReader(s));
441
442 try {
443 String snippet = highlighter.getBestFragments(
444 tokenStream, s, maxNumFragments, fragmentSuffix);
445
446 if (Validator.isNotNull(snippet) &&
447 !StringUtil.endsWith(snippet, fragmentSuffix) &&
448 !s.equals(snippet)) {
449
450 snippet = snippet.concat(fragmentSuffix);
451 }
452
453 return snippet;
454 }
455 catch (InvalidTokenOffsetsException itoe) {
456 throw new IOException(itoe.getMessage());
457 }
458 }
459
460 public Version getVersion() {
461 return _version;
462 }
463
464 public boolean isLoadIndexFromClusterEnabled() {
465 if (PropsValues.CLUSTER_LINK_ENABLED &&
466 PropsValues.LUCENE_REPLICATE_WRITE) {
467
468 return true;
469 }
470
471 if (_log.isDebugEnabled()) {
472 _log.debug("Load index from cluster is not enabled");
473 }
474
475 return false;
476 }
477
478 public void loadIndex(long companyId, InputStream inputStream)
479 throws IOException {
480
481 if (!isLoadIndexFromClusterEnabled()) {
482 return;
483 }
484
485 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
486
487 if (indexAccessor == null) {
488 if (_log.isInfoEnabled()) {
489 _log.info(
490 "Skip loading Lucene index files for company " + companyId +
491 " in favor of lazy loading");
492 }
493
494 return;
495 }
496
497 StopWatch stopWatch = null;
498
499 if (_log.isInfoEnabled()) {
500 _log.info(
501 "Start loading Lucene index files for company " + companyId);
502
503 stopWatch = new StopWatch();
504
505 stopWatch.start();
506 }
507
508 indexAccessor.loadIndex(inputStream);
509
510 if (_log.isInfoEnabled()) {
511 _log.info(
512 "Finished loading index files for company " + companyId +
513 " in " + stopWatch.getTime() + " ms");
514 }
515 }
516
517 public void loadIndexesFromCluster(long companyId) throws SystemException {
518 if (!isLoadIndexFromClusterEnabled()) {
519 return;
520 }
521
522 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
523
524 if (indexAccessor == null) {
525 return;
526 }
527
528 long localLastGeneration = getLastGeneration(companyId);
529
530 _loadIndexFromCluster(indexAccessor, localLastGeneration);
531 }
532
533 public void setAnalyzer(Analyzer analyzer) {
534 _analyzer = analyzer;
535 }
536
537 public void setVersion(Version version) {
538 _version = version;
539 }
540
541 public void shutdown() {
542 if (_luceneIndexThreadPoolExecutor != null) {
543 _luceneIndexThreadPoolExecutor.shutdownNow();
544
545 try {
546 _luceneIndexThreadPoolExecutor.awaitTermination(
547 60, TimeUnit.SECONDS);
548 }
549 catch (InterruptedException ie) {
550 _log.error("Lucene indexer shutdown interrupted", ie);
551 }
552 }
553
554 if (isLoadIndexFromClusterEnabled()) {
555 ClusterExecutorUtil.removeClusterEventListener(
556 _loadIndexClusterEventListener);
557 }
558
559 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
560 indexAccessor.close();
561 }
562 }
563
564 public void startup(long companyId) {
565 if (PropsValues.INDEX_ON_STARTUP) {
566 if (_log.isInfoEnabled()) {
567 _log.info("Indexing Lucene on startup");
568 }
569
570 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
571
572 if (PropsValues.INDEX_WITH_THREAD) {
573 if (_luceneIndexThreadPoolExecutor == null) {
574
575
576
577
578
579 _luceneIndexThreadPoolExecutor =
580 PortalExecutorManagerUtil.getPortalExecutor(
581 LuceneHelperImpl.class.getName());
582 }
583
584 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
585 }
586 else {
587 luceneIndexer.reindex();
588 }
589 }
590 }
591
592 public void updateDocument(long companyId, Term term, Document document)
593 throws IOException {
594
595 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
596
597 indexAccessor.updateDocument(term, document);
598 }
599
600 private LuceneHelperImpl() {
601 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
602 _luceneIndexThreadPoolExecutor =
603 PortalExecutorManagerUtil.getPortalExecutor(
604 LuceneHelperImpl.class.getName());
605 }
606
607 if (isLoadIndexFromClusterEnabled()) {
608 _loadIndexClusterEventListener =
609 new LoadIndexClusterEventListener();
610
611 ClusterExecutorUtil.addClusterEventListener(
612 _loadIndexClusterEventListener);
613 }
614 }
615
616 private ObjectValuePair<String, URL>
617 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
618 throws SystemException {
619
620 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
621 new MethodHandler(
622 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
623 bootupAddress);
624
625 FutureClusterResponses futureClusterResponses =
626 ClusterExecutorUtil.execute(clusterRequest);
627
628 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
629 futureClusterResponses.getPartialResults();
630
631 try {
632 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
633 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
634
635 String transientToken = (String)clusterNodeResponse.getResult();
636
637 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
638
639 InetAddress inetAddress = clusterNode.getInetAddress();
640
641 String fileName = PortalUtil.getPathContext();
642
643 if (!fileName.endsWith(StringPool.SLASH)) {
644 fileName = fileName.concat(StringPool.SLASH);
645 }
646
647 fileName = fileName.concat("lucene/dump");
648
649 URL url = new URL(
650 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
651 fileName);
652
653 return new ObjectValuePair<String, URL>(transientToken, url);
654 }
655 catch (Exception e) {
656 throw new SystemException(e);
657 }
658 }
659
660 private IndexAccessor _getIndexAccessor(long companyId) {
661 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
662
663 if (indexAccessor != null) {
664 return indexAccessor;
665 }
666
667 synchronized (this) {
668 indexAccessor = _indexAccessors.get(companyId);
669
670 if (indexAccessor == null) {
671 indexAccessor = new IndexAccessorImpl(companyId);
672
673 if (isLoadIndexFromClusterEnabled()) {
674 boolean clusterForwardMessage = GetterUtil.getBoolean(
675 MessageValuesThreadLocal.getValue(
676 ClusterLink.CLUSTER_FORWARD_MESSAGE));
677
678 if (clusterForwardMessage) {
679 if (_log.isInfoEnabled()) {
680 _log.info(
681 "Skip Luncene index files cluster loading " +
682 "since this is a manual reindex request");
683 }
684 }
685 else {
686 indexAccessor = new SynchronizedIndexAccessorImpl(
687 indexAccessor);
688
689 try {
690 _loadIndexFromCluster(
691 indexAccessor,
692 indexAccessor.getLastGeneration());
693 }
694 catch (Exception e) {
695 _log.error(
696 "Unable to load index for company " +
697 indexAccessor.getCompanyId(),
698 e);
699 }
700 }
701 }
702
703 _indexAccessors.put(companyId, indexAccessor);
704 }
705 }
706
707 return indexAccessor;
708 }
709
710 private void _includeIfUnique(
711 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
712 boolean like) {
713
714 if (query instanceof TermQuery) {
715 Set<Term> terms = new HashSet<Term>();
716
717 query.extractTerms(terms);
718
719 for (Term term : terms) {
720 String termValue = term.text();
721
722 if (like) {
723 term = term.createTerm(
724 StringPool.STAR.concat(termValue).concat(
725 StringPool.STAR));
726
727 query = new WildcardQuery(term);
728 }
729 else {
730 query = new TermQuery(term);
731 }
732
733 boolean included = false;
734
735 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
736 if (query.equals(booleanClause.getQuery())) {
737 included = true;
738 }
739 }
740
741 if (!included) {
742 booleanQuery.add(query, occur);
743 }
744 }
745 }
746 else if (query instanceof BooleanQuery) {
747 BooleanQuery curBooleanQuery = (BooleanQuery)query;
748
749 BooleanQuery containerBooleanQuery = new BooleanQuery();
750
751 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
752 _includeIfUnique(
753 containerBooleanQuery, booleanClause.getQuery(),
754 booleanClause.getOccur(), like);
755 }
756
757 if (containerBooleanQuery.getClauses().length > 0) {
758 booleanQuery.add(containerBooleanQuery, occur);
759 }
760 }
761 else {
762 boolean included = false;
763
764 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
765 if (query.equals(booleanClause.getQuery())) {
766 included = true;
767 }
768 }
769
770 if (!included) {
771 booleanQuery.add(query, occur);
772 }
773 }
774 }
775
776 private void _loadIndexFromCluster(
777 IndexAccessor indexAccessor, long localLastGeneration)
778 throws SystemException {
779
780 List<Address> clusterNodeAddresses =
781 ClusterExecutorUtil.getClusterNodeAddresses();
782
783 int clusterNodeAddressesCount = clusterNodeAddresses.size();
784
785 if (clusterNodeAddressesCount <= 1) {
786 if (_log.isDebugEnabled()) {
787 _log.debug(
788 "Do not load indexes because there is either one portal " +
789 "instance or no portal instances in the cluster");
790 }
791
792 return;
793 }
794
795 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
796 new MethodHandler(
797 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
798 true);
799
800 ClusterExecutorUtil.execute(
801 clusterRequest,
802 new LoadIndexClusterResponseCallback(
803 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
804 }
805
806 private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
807
808 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
809
810 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
811
812 private static MethodKey _createTokenMethodKey = new MethodKey(
813 TransientTokenUtil.class, "createToken", long.class);
814 private static MethodKey _getLastGenerationMethodKey = new MethodKey(
815 LuceneHelperUtil.class, "getLastGeneration", long.class);
816
817 private Analyzer _analyzer;
818 private Map<Long, IndexAccessor> _indexAccessors =
819 new ConcurrentHashMap<Long, IndexAccessor>();
820 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
821 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
822 private Version _version;
823
824 private class LoadIndexClusterEventListener
825 implements ClusterEventListener {
826
827 public void processClusterEvent(ClusterEvent clusterEvent) {
828 ClusterEventType clusterEventType =
829 clusterEvent.getClusterEventType();
830
831 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
832 return;
833 }
834
835 List<Address> clusterNodeAddresses =
836 ClusterExecutorUtil.getClusterNodeAddresses();
837 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
838
839 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
840 if (_log.isDebugEnabled()) {
841 _log.debug(
842 "Number of original cluster members is greater than " +
843 "one");
844 }
845
846 return;
847 }
848
849 long[] companyIds = PortalInstances.getCompanyIds();
850
851 for (long companyId : companyIds) {
852 loadIndexes(companyId);
853 }
854
855 loadIndexes(CompanyConstants.SYSTEM);
856 }
857
858 private void loadIndexes(long companyId) {
859 long lastGeneration = getLastGeneration(companyId);
860
861 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
862 return;
863 }
864
865 try {
866 LuceneClusterUtil.loadIndexesFromCluster(companyId);
867 }
868 catch (Exception e) {
869 _log.error(
870 "Unable to load indexes for company " + companyId, e);
871 }
872 }
873
874 }
875
876 private class LoadIndexClusterResponseCallback
877 extends BaseClusterResponseCallback {
878
879 public LoadIndexClusterResponseCallback(
880 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
881 long localLastGeneration) {
882
883 _indexAccessor = indexAccessor;
884 _clusterNodeAddressesCount = clusterNodeAddressesCount;
885 _localLastGeneration = localLastGeneration;
886
887 _companyId = _indexAccessor.getCompanyId();
888 }
889
890 @Override
891 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
892 Address bootupAddress = null;
893
894 do {
895 _clusterNodeAddressesCount--;
896
897 ClusterNodeResponse clusterNodeResponse = null;
898
899 try {
900 clusterNodeResponse = blockingQueue.poll(
901 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
902 TimeUnit.MILLISECONDS);
903 }
904 catch (Exception e) {
905 _log.error("Unable to get cluster node response", e);
906 }
907
908 if (clusterNodeResponse == null) {
909 if (_log.isDebugEnabled()) {
910 _log.debug(
911 "Unable to get cluster node response in " +
912 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
913 TimeUnit.MILLISECONDS);
914 }
915
916 continue;
917 }
918
919 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
920
921 if (clusterNode.getPort() > 0) {
922 try {
923 long remoteLastGeneration =
924 (Long)clusterNodeResponse.getResult();
925
926 if (remoteLastGeneration > _localLastGeneration) {
927 bootupAddress = clusterNodeResponse.getAddress();
928
929 break;
930 }
931 }
932 catch (Exception e) {
933 if (_log.isDebugEnabled()) {
934 _log.debug(
935 "Suppress exception caused by remote method " +
936 "invocation",
937 e);
938 }
939
940 continue;
941 }
942 }
943 else {
944 if (_log.isDebugEnabled()) {
945 _log.debug(
946 "Cluster node " + clusterNode +
947 " has invalid port");
948 }
949 }
950 }
951 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
952
953 if (bootupAddress == null) {
954 return;
955 }
956
957 if (_log.isInfoEnabled()) {
958 _log.info(
959 "Start loading lucene index files from cluster node " +
960 bootupAddress);
961 }
962
963 InputStream inputStream = null;
964
965 try {
966 inputStream = getLoadIndexesInputStreamFromCluster(
967 _companyId, bootupAddress);
968
969 _indexAccessor.loadIndex(inputStream);
970
971 if (_log.isInfoEnabled()) {
972 _log.info("Lucene index files loaded successfully");
973 }
974 }
975 catch (Exception e) {
976 _log.error("Unable to load index for company " + _companyId, e);
977 }
978 finally {
979 if (inputStream != null) {
980 try {
981 inputStream.close();
982 }
983 catch (IOException ioe) {
984 _log.error(
985 "Unable to close input stream for company " +
986 _companyId,
987 ioe);
988 }
989 }
990 }
991 }
992
993 @Override
994 public void processTimeoutException(TimeoutException timeoutException) {
995 _log.error(
996 "Uanble to load index for company " + _companyId,
997 timeoutException);
998 }
999
1000 private int _clusterNodeAddressesCount;
1001 private long _companyId;
1002 private IndexAccessor _indexAccessor;
1003 private long _localLastGeneration;
1004
1005 }
1006
1007 }