001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036 import com.liferay.portal.kernel.search.BooleanClauseOccur;
037 import com.liferay.portal.kernel.search.Field;
038 import com.liferay.portal.kernel.util.ArrayUtil;
039 import com.liferay.portal.kernel.util.GetterUtil;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.StringPool;
044 import com.liferay.portal.kernel.util.StringUtil;
045 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
046 import com.liferay.portal.kernel.util.Validator;
047 import com.liferay.portal.model.CompanyConstants;
048 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
049 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
050 import com.liferay.portal.security.auth.TransientTokenUtil;
051 import com.liferay.portal.util.PortalInstances;
052 import com.liferay.portal.util.PortalUtil;
053 import com.liferay.portal.util.PropsValues;
054 import com.liferay.util.lucene.KeywordsUtil;
055
056 import java.io.IOException;
057 import java.io.InputStream;
058 import java.io.OutputStream;
059
060 import java.net.InetAddress;
061 import java.net.URL;
062 import java.net.URLConnection;
063
064 import java.util.HashSet;
065 import java.util.List;
066 import java.util.Map;
067 import java.util.Set;
068 import java.util.concurrent.BlockingQueue;
069 import java.util.concurrent.ConcurrentHashMap;
070 import java.util.concurrent.TimeUnit;
071 import java.util.concurrent.TimeoutException;
072
073 import org.apache.commons.lang.time.StopWatch;
074 import org.apache.lucene.analysis.Analyzer;
075 import org.apache.lucene.analysis.TokenStream;
076 import org.apache.lucene.document.Document;
077 import org.apache.lucene.index.IndexReader;
078 import org.apache.lucene.index.Term;
079 import org.apache.lucene.queryParser.QueryParser;
080 import org.apache.lucene.search.BooleanClause;
081 import org.apache.lucene.search.BooleanQuery;
082 import org.apache.lucene.search.IndexSearcher;
083 import org.apache.lucene.search.NumericRangeQuery;
084 import org.apache.lucene.search.Query;
085 import org.apache.lucene.search.TermQuery;
086 import org.apache.lucene.search.TermRangeQuery;
087 import org.apache.lucene.search.WildcardQuery;
088 import org.apache.lucene.search.highlight.Formatter;
089 import org.apache.lucene.search.highlight.Highlighter;
090 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
091 import org.apache.lucene.search.highlight.QueryScorer;
092 import org.apache.lucene.search.highlight.SimpleFragmenter;
093 import org.apache.lucene.search.highlight.WeightedTerm;
094 import org.apache.lucene.util.Version;
095
096
104 public class LuceneHelperImpl implements LuceneHelper {
105
106 public void addDocument(long companyId, Document document)
107 throws IOException {
108
109 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
110
111 indexAccessor.addDocument(document);
112 }
113
114 public void addExactTerm(
115 BooleanQuery booleanQuery, String field, String value) {
116
117 addTerm(booleanQuery, field, value, false);
118 }
119
120 public void addNumericRangeTerm(
121 BooleanQuery booleanQuery, String field, String startValue,
122 String endValue) {
123
124 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
125 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
126 true, true);
127
128 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
129 }
130
131 public void addRangeTerm(
132 BooleanQuery booleanQuery, String field, String startValue,
133 String endValue) {
134
135 boolean includesLower = true;
136
137 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
138 includesLower = false;
139 }
140
141 boolean includesUpper = true;
142
143 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
144 includesUpper = false;
145 }
146
147 TermRangeQuery termRangeQuery = new TermRangeQuery(
148 field, startValue, endValue, includesLower, includesUpper);
149
150 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
151 }
152
153 public void addRequiredTerm(
154 BooleanQuery booleanQuery, String field, String value, boolean like) {
155
156 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
157 }
158
159 public void addRequiredTerm(
160 BooleanQuery booleanQuery, String field, String[] values,
161 boolean like) {
162
163 if (values == null) {
164 return;
165 }
166
167 BooleanQuery query = new BooleanQuery();
168
169 for (String value : values) {
170 addTerm(query, field, value, like);
171 }
172
173 booleanQuery.add(query, BooleanClause.Occur.MUST);
174 }
175
176 public void addTerm(
177 BooleanQuery booleanQuery, String field, String value, boolean like) {
178
179 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
180 }
181
182 public void addTerm(
183 BooleanQuery booleanQuery, String field, String value, boolean like,
184 BooleanClauseOccur booleanClauseOccur) {
185
186 if (Validator.isNull(value)) {
187 return;
188 }
189
190 Analyzer analyzer = getAnalyzer();
191
192 if (analyzer instanceof PerFieldAnalyzer) {
193 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
194
195 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
196
197 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
198 like = true;
199 }
200 }
201
202 if (like) {
203 value = StringUtil.replace(
204 value, StringPool.PERCENT, StringPool.BLANK);
205 }
206
207 try {
208 QueryParser queryParser = new QueryParser(
209 getVersion(), field, analyzer);
210
211 queryParser.setAllowLeadingWildcard(true);
212
213 Query query = null;
214
215 try {
216 if (like) {
217
218
219
220 value = value.toLowerCase(queryParser.getLocale());
221 }
222
223 query = queryParser.parse(value);
224 }
225 catch (Exception e) {
226 query = queryParser.parse(KeywordsUtil.escape(value));
227 }
228
229 BooleanClause.Occur occur = null;
230
231 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
232 occur = BooleanClause.Occur.MUST;
233 }
234 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
235 occur = BooleanClause.Occur.MUST_NOT;
236 }
237 else {
238 occur = BooleanClause.Occur.SHOULD;
239 }
240
241 _includeIfUnique(booleanQuery, query, occur, like);
242 }
243 catch (Exception e) {
244 _log.error(e, e);
245 }
246 }
247
248 public void addTerm(
249 BooleanQuery booleanQuery, String field, String[] values,
250 boolean like) {
251
252 for (String value : values) {
253 addTerm(booleanQuery, field, value, like);
254 }
255 }
256
257 public int countScoredFieldNames(Query query, String[] filedNames) {
258 int count = 0;
259
260 for (String fieldName : filedNames) {
261 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
262 query, false, fieldName);
263
264 if ((weightedTerms.length > 0) &&
265 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
266
267 count++;
268 }
269 }
270
271 return count;
272 }
273
274 public void delete(long companyId) {
275 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
276
277 if (indexAccessor == null) {
278 return;
279 }
280
281 indexAccessor.delete();
282 }
283
284 public void deleteDocuments(long companyId, Term term) throws IOException {
285 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
286
287 if (indexAccessor == null) {
288 return;
289 }
290
291 indexAccessor.deleteDocuments(term);
292 }
293
294 public void dumpIndex(long companyId, OutputStream outputStream)
295 throws IOException {
296
297 long lastGeneration = getLastGeneration(companyId);
298
299 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
300 if (_log.isDebugEnabled()) {
301 _log.debug(
302 "Dump index from cluster is not enabled for " + companyId);
303 }
304
305 return;
306 }
307
308 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
309
310 if (indexAccessor == null) {
311 return;
312 }
313
314 indexAccessor.dumpIndex(outputStream);
315 }
316
317 public Analyzer getAnalyzer() {
318 return _analyzer;
319 }
320
321 public long getLastGeneration(long companyId) {
322 if (!isLoadIndexFromClusterEnabled()) {
323 return IndexAccessor.DEFAULT_LAST_GENERATION;
324 }
325
326 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
327
328 if (indexAccessor == null) {
329 return IndexAccessor.DEFAULT_LAST_GENERATION;
330 }
331
332 return indexAccessor.getLastGeneration();
333 }
334
335 public InputStream getLoadIndexesInputStreamFromCluster(
336 long companyId, Address bootupAddress)
337 throws SystemException {
338
339 if (!isLoadIndexFromClusterEnabled()) {
340 return null;
341 }
342
343 InputStream inputStream = null;
344
345 try {
346 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
347 _getBootupClusterNodeObjectValuePair(bootupAddress);
348
349 URL url = bootupClusterNodeObjectValuePair.getValue();
350
351 URLConnection urlConnection = url.openConnection();
352
353 urlConnection.setDoOutput(true);
354
355 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
356 urlConnection.getOutputStream());
357
358 unsyncPrintWriter.write("transientToken=");
359 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
360 unsyncPrintWriter.write("&companyId=");
361 unsyncPrintWriter.write(String.valueOf(companyId));
362
363 unsyncPrintWriter.close();
364
365 inputStream = urlConnection.getInputStream();
366
367 return inputStream;
368 }
369 catch (IOException ioe) {
370 throw new SystemException(ioe);
371 }
372 }
373
374 public Set<String> getQueryTerms(Query query) {
375 String queryString = StringUtil.replace(
376 query.toString(), StringPool.STAR, StringPool.BLANK);
377
378 Query tempQuery = null;
379
380 try {
381 QueryParser queryParser = new QueryParser(
382 getVersion(), StringPool.BLANK, getAnalyzer());
383
384 tempQuery = queryParser.parse(queryString);
385 }
386 catch (Exception e) {
387 if (_log.isWarnEnabled()) {
388 _log.warn("Unable to parse " + queryString);
389 }
390
391 tempQuery = query;
392 }
393
394 WeightedTerm[] weightedTerms = null;
395
396 for (String fieldName : Field.KEYWORDS) {
397 weightedTerms = QueryTermExtractor.getTerms(
398 tempQuery, false, fieldName);
399
400 if (weightedTerms.length > 0) {
401 break;
402 }
403 }
404
405 Set<String> queryTerms = new HashSet<String>();
406
407 for (WeightedTerm weightedTerm : weightedTerms) {
408 queryTerms.add(weightedTerm.getTerm());
409 }
410
411 return queryTerms;
412 }
413
414 public IndexSearcher getSearcher(long companyId, boolean readOnly)
415 throws IOException {
416
417 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
418
419 IndexReader indexReader = IndexReader.open(
420 indexAccessor.getLuceneDir(), readOnly);
421
422 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
423
424 indexSearcher.setDefaultFieldSortScoring(true, true);
425 indexSearcher.setSimilarity(new FieldWeightSimilarity());
426
427 return indexSearcher;
428 }
429
430 public String getSnippet(
431 Query query, String field, String s, int maxNumFragments,
432 int fragmentLength, String fragmentSuffix, Formatter formatter)
433 throws IOException {
434
435 QueryScorer queryScorer = new QueryScorer(query, field);
436
437 Highlighter highlighter = new Highlighter(formatter, queryScorer);
438
439 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
440
441 TokenStream tokenStream = getAnalyzer().tokenStream(
442 field, new UnsyncStringReader(s));
443
444 try {
445 String snippet = highlighter.getBestFragments(
446 tokenStream, s, maxNumFragments, fragmentSuffix);
447
448 if (Validator.isNotNull(snippet) &&
449 !StringUtil.endsWith(snippet, fragmentSuffix) &&
450 !s.equals(snippet)) {
451
452 snippet = snippet.concat(fragmentSuffix);
453 }
454
455 return snippet;
456 }
457 catch (InvalidTokenOffsetsException itoe) {
458 throw new IOException(itoe.getMessage());
459 }
460 }
461
462 public Version getVersion() {
463 return _version;
464 }
465
466 public boolean isLoadIndexFromClusterEnabled() {
467 if (PropsValues.CLUSTER_LINK_ENABLED &&
468 PropsValues.LUCENE_REPLICATE_WRITE) {
469
470 return true;
471 }
472
473 if (_log.isDebugEnabled()) {
474 _log.debug("Load index from cluster is not enabled");
475 }
476
477 return false;
478 }
479
480 public void loadIndex(long companyId, InputStream inputStream)
481 throws IOException {
482
483 if (!isLoadIndexFromClusterEnabled()) {
484 return;
485 }
486
487 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
488
489 if (indexAccessor == null) {
490 if (_log.isInfoEnabled()) {
491 _log.info(
492 "Skip loading Lucene index files for company " + companyId +
493 " in favor of lazy loading");
494 }
495
496 return;
497 }
498
499 StopWatch stopWatch = null;
500
501 if (_log.isInfoEnabled()) {
502 _log.info(
503 "Start loading Lucene index files for company " + companyId);
504
505 stopWatch = new StopWatch();
506
507 stopWatch.start();
508 }
509
510 indexAccessor.loadIndex(inputStream);
511
512 if (_log.isInfoEnabled()) {
513 _log.info(
514 "Finished loading index files for company " + companyId +
515 " in " + stopWatch.getTime() + " ms");
516 }
517 }
518
519 public void loadIndexesFromCluster(long companyId) throws SystemException {
520 if (!isLoadIndexFromClusterEnabled()) {
521 return;
522 }
523
524 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
525
526 if (indexAccessor == null) {
527 return;
528 }
529
530 long localLastGeneration = getLastGeneration(companyId);
531
532 _loadIndexFromCluster(indexAccessor, localLastGeneration);
533 }
534
535 public void setAnalyzer(Analyzer analyzer) {
536 _analyzer = analyzer;
537 }
538
539 public void setVersion(Version version) {
540 _version = version;
541 }
542
543 public void shutdown() {
544 if (_luceneIndexThreadPoolExecutor != null) {
545 _luceneIndexThreadPoolExecutor.shutdownNow();
546
547 try {
548 _luceneIndexThreadPoolExecutor.awaitTermination(
549 60, TimeUnit.SECONDS);
550 }
551 catch (InterruptedException ie) {
552 _log.error("Lucene indexer shutdown interrupted", ie);
553 }
554 }
555
556 if (isLoadIndexFromClusterEnabled()) {
557 ClusterExecutorUtil.removeClusterEventListener(
558 _loadIndexClusterEventListener);
559 }
560
561 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
562 indexAccessor.close();
563 }
564 }
565
566 public void startup(long companyId) {
567 if (PropsValues.INDEX_ON_STARTUP) {
568 if (_log.isInfoEnabled()) {
569 _log.info("Indexing Lucene on startup");
570 }
571
572 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
573
574 if (PropsValues.INDEX_WITH_THREAD) {
575 if (_luceneIndexThreadPoolExecutor == null) {
576
577
578
579
580
581 _luceneIndexThreadPoolExecutor =
582 PortalExecutorManagerUtil.getPortalExecutor(
583 LuceneHelperImpl.class.getName());
584 }
585
586 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
587 }
588 else {
589 luceneIndexer.reindex();
590 }
591 }
592 }
593
594 public void updateDocument(long companyId, Term term, Document document)
595 throws IOException {
596
597 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
598
599 indexAccessor.updateDocument(term, document);
600 }
601
602 private LuceneHelperImpl() {
603 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
604 _luceneIndexThreadPoolExecutor =
605 PortalExecutorManagerUtil.getPortalExecutor(
606 LuceneHelperImpl.class.getName());
607 }
608
609 if (isLoadIndexFromClusterEnabled()) {
610 _loadIndexClusterEventListener =
611 new LoadIndexClusterEventListener();
612
613 ClusterExecutorUtil.addClusterEventListener(
614 _loadIndexClusterEventListener);
615 }
616 }
617
618 private ObjectValuePair<String, URL>
619 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
620 throws SystemException {
621
622 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
623 new MethodHandler(
624 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
625 bootupAddress);
626
627 FutureClusterResponses futureClusterResponses =
628 ClusterExecutorUtil.execute(clusterRequest);
629
630 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
631 futureClusterResponses.getPartialResults();
632
633 try {
634 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
635 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
636
637 String transientToken = (String)clusterNodeResponse.getResult();
638
639 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
640
641 InetAddress inetAddress = clusterNode.getInetAddress();
642
643 String fileName = PortalUtil.getPathContext();
644
645 if (!fileName.endsWith(StringPool.SLASH)) {
646 fileName = fileName.concat(StringPool.SLASH);
647 }
648
649 fileName = fileName.concat("lucene/dump");
650
651 URL url = new URL(
652 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
653 fileName);
654
655 return new ObjectValuePair<String, URL>(transientToken, url);
656 }
657 catch (Exception e) {
658 throw new SystemException(e);
659 }
660 }
661
662 private IndexAccessor _getIndexAccessor(long companyId) {
663 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
664
665 if (indexAccessor != null) {
666 return indexAccessor;
667 }
668
669 synchronized (this) {
670 indexAccessor = _indexAccessors.get(companyId);
671
672 if (indexAccessor == null) {
673 indexAccessor = new IndexAccessorImpl(companyId);
674
675 if (isLoadIndexFromClusterEnabled()) {
676 boolean clusterForwardMessage = GetterUtil.getBoolean(
677 MessageValuesThreadLocal.getValue(
678 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE));
679
680 if (clusterForwardMessage) {
681 if (_log.isInfoEnabled()) {
682 _log.info(
683 "Skip Luncene index files cluster loading " +
684 "since this is a manual reindex request");
685 }
686 }
687 else {
688 indexAccessor = new SynchronizedIndexAccessorImpl(
689 indexAccessor);
690
691 try {
692 _loadIndexFromCluster(
693 indexAccessor,
694 indexAccessor.getLastGeneration());
695 }
696 catch (Exception e) {
697 _log.error(
698 "Unable to load index for company " +
699 indexAccessor.getCompanyId(),
700 e);
701 }
702 }
703 }
704
705 _indexAccessors.put(companyId, indexAccessor);
706 }
707 }
708
709 return indexAccessor;
710 }
711
712 private void _includeIfUnique(
713 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
714 boolean like) {
715
716 if (query instanceof TermQuery) {
717 Set<Term> terms = new HashSet<Term>();
718
719 query.extractTerms(terms);
720
721 for (Term term : terms) {
722 String termValue = term.text();
723
724 if (like) {
725 term = term.createTerm(
726 StringPool.STAR.concat(termValue).concat(
727 StringPool.STAR));
728
729 query = new WildcardQuery(term);
730 }
731 else {
732 query = new TermQuery(term);
733 }
734
735 boolean included = false;
736
737 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
738 if (query.equals(booleanClause.getQuery())) {
739 included = true;
740 }
741 }
742
743 if (!included) {
744 booleanQuery.add(query, occur);
745 }
746 }
747 }
748 else if (query instanceof BooleanQuery) {
749 BooleanQuery curBooleanQuery = (BooleanQuery)query;
750
751 BooleanQuery containerBooleanQuery = new BooleanQuery();
752
753 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
754 _includeIfUnique(
755 containerBooleanQuery, booleanClause.getQuery(),
756 booleanClause.getOccur(), like);
757 }
758
759 if (containerBooleanQuery.getClauses().length > 0) {
760 booleanQuery.add(containerBooleanQuery, occur);
761 }
762 }
763 else {
764 boolean included = false;
765
766 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
767 if (query.equals(booleanClause.getQuery())) {
768 included = true;
769 }
770 }
771
772 if (!included) {
773 booleanQuery.add(query, occur);
774 }
775 }
776 }
777
778 private void _loadIndexFromCluster(
779 IndexAccessor indexAccessor, long localLastGeneration)
780 throws SystemException {
781
782 List<Address> clusterNodeAddresses =
783 ClusterExecutorUtil.getClusterNodeAddresses();
784
785 int clusterNodeAddressesCount = clusterNodeAddresses.size();
786
787 if (clusterNodeAddressesCount <= 1) {
788 if (_log.isDebugEnabled()) {
789 _log.debug(
790 "Do not load indexes because there is either one portal " +
791 "instance or no portal instances in the cluster");
792 }
793
794 return;
795 }
796
797 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
798 new MethodHandler(
799 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
800 true);
801
802 ClusterExecutorUtil.execute(
803 clusterRequest,
804 new LoadIndexClusterResponseCallback(
805 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
806 }
807
808 private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
809
810 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
811
812 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
813
814 private static MethodKey _createTokenMethodKey =
815 new MethodKey(TransientTokenUtil.class, "createToken", long.class);
816 private static MethodKey _getLastGenerationMethodKey =
817 new MethodKey(LuceneHelperUtil.class, "getLastGeneration", long.class);
818
819 private Analyzer _analyzer;
820 private Map<Long, IndexAccessor> _indexAccessors =
821 new ConcurrentHashMap<Long, IndexAccessor>();
822 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
823 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
824 private Version _version;
825
826 private class LoadIndexClusterEventListener
827 implements ClusterEventListener {
828
829 public void processClusterEvent(ClusterEvent clusterEvent) {
830 ClusterEventType clusterEventType =
831 clusterEvent.getClusterEventType();
832
833 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
834 return;
835 }
836
837 List<Address> clusterNodeAddresses =
838 ClusterExecutorUtil.getClusterNodeAddresses();
839 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
840
841 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
842 if (_log.isDebugEnabled()) {
843 _log.debug(
844 "Number of original cluster members is greater than " +
845 "one");
846 }
847
848 return;
849 }
850
851 long[] companyIds = PortalInstances.getCompanyIds();
852
853 for (long companyId : companyIds) {
854 loadIndexes(companyId);
855 }
856
857 loadIndexes(CompanyConstants.SYSTEM);
858 }
859
860 private void loadIndexes(long companyId) {
861 long lastGeneration = getLastGeneration(companyId);
862
863 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
864 return;
865 }
866
867 try {
868 LuceneClusterUtil.loadIndexesFromCluster(companyId);
869 }
870 catch (Exception e) {
871 _log.error(
872 "Unable to load indexes for company " + companyId, e);
873 }
874 }
875
876 }
877
878 private class LoadIndexClusterResponseCallback
879 extends BaseClusterResponseCallback {
880
881 public LoadIndexClusterResponseCallback(
882 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
883 long localLastGeneration) {
884
885 _indexAccessor = indexAccessor;
886 _clusterNodeAddressesCount = clusterNodeAddressesCount;
887 _localLastGeneration = localLastGeneration;
888
889 _companyId = _indexAccessor.getCompanyId();
890 }
891
892 @Override
893 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
894 Address bootupAddress = null;
895
896 do {
897 _clusterNodeAddressesCount--;
898
899 ClusterNodeResponse clusterNodeResponse = null;
900
901 try {
902 clusterNodeResponse = blockingQueue.poll(
903 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
904 TimeUnit.MILLISECONDS);
905 }
906 catch (Exception e) {
907 _log.error("Unable to get cluster node response", e);
908 }
909
910 if (clusterNodeResponse == null) {
911 if (_log.isDebugEnabled()) {
912 _log.debug(
913 "Unable to get cluster node response in " +
914 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
915 TimeUnit.MILLISECONDS);
916 }
917
918 continue;
919 }
920
921 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
922
923 if (clusterNode.getPort() > 0) {
924 try {
925 long remoteLastGeneration =
926 (Long)clusterNodeResponse.getResult();
927
928 if (remoteLastGeneration > _localLastGeneration) {
929 bootupAddress = clusterNodeResponse.getAddress();
930
931 break;
932 }
933 }
934 catch (Exception e) {
935 if (_log.isDebugEnabled()) {
936 _log.debug(
937 "Suppress exception caused by remote method " +
938 "invocation",
939 e);
940 }
941
942 continue;
943 }
944 }
945 else {
946 if (_log.isDebugEnabled()) {
947 _log.debug(
948 "Cluster node " + clusterNode +
949 " has invalid port");
950 }
951 }
952 }
953 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
954
955 if (bootupAddress == null) {
956 return;
957 }
958
959 if (_log.isInfoEnabled()) {
960 _log.info(
961 "Start loading lucene index files from cluster node " +
962 bootupAddress);
963 }
964
965 InputStream inputStream = null;
966
967 try {
968 inputStream = getLoadIndexesInputStreamFromCluster(
969 _companyId, bootupAddress);
970
971 _indexAccessor.loadIndex(inputStream);
972
973 if (_log.isInfoEnabled()) {
974 _log.info("Lucene index files loaded successfully");
975 }
976 }
977 catch (Exception e) {
978 _log.error("Unable to load index for company " + _companyId, e);
979 }
980 finally {
981 if (inputStream != null) {
982 try {
983 inputStream.close();
984 }
985 catch (IOException ioe) {
986 _log.error(
987 "Unable to close input stream for company " +
988 _companyId,
989 ioe);
990 }
991 }
992 }
993 }
994
995 @Override
996 public void processTimeoutException(TimeoutException timeoutException) {
997 _log.error(
998 "Uanble to load index for company " + _companyId,
999 timeoutException);
1000 }
1001
1002 private int _clusterNodeAddressesCount;
1003 private long _companyId;
1004 private IndexAccessor _indexAccessor;
1005 private long _localLastGeneration;
1006
1007 }
1008
1009 }