001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036 import com.liferay.portal.kernel.search.BooleanClauseOccur;
037 import com.liferay.portal.kernel.search.Field;
038 import com.liferay.portal.kernel.util.ArrayUtil;
039 import com.liferay.portal.kernel.util.GetterUtil;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.StringPool;
044 import com.liferay.portal.kernel.util.StringUtil;
045 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
046 import com.liferay.portal.kernel.util.Validator;
047 import com.liferay.portal.model.CompanyConstants;
048 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
049 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
050 import com.liferay.portal.security.auth.TransientTokenUtil;
051 import com.liferay.portal.util.PortalInstances;
052 import com.liferay.portal.util.PropsValues;
053 import com.liferay.util.lucene.KeywordsUtil;
054
055 import java.io.IOException;
056 import java.io.InputStream;
057 import java.io.OutputStream;
058
059 import java.net.InetAddress;
060 import java.net.URL;
061 import java.net.URLConnection;
062
063 import java.util.HashSet;
064 import java.util.List;
065 import java.util.Map;
066 import java.util.Set;
067 import java.util.concurrent.BlockingQueue;
068 import java.util.concurrent.ConcurrentHashMap;
069 import java.util.concurrent.TimeUnit;
070 import java.util.concurrent.TimeoutException;
071
072 import org.apache.commons.lang.time.StopWatch;
073 import org.apache.lucene.analysis.Analyzer;
074 import org.apache.lucene.analysis.TokenStream;
075 import org.apache.lucene.document.Document;
076 import org.apache.lucene.index.Term;
077 import org.apache.lucene.queryParser.QueryParser;
078 import org.apache.lucene.search.BooleanClause;
079 import org.apache.lucene.search.BooleanQuery;
080 import org.apache.lucene.search.IndexSearcher;
081 import org.apache.lucene.search.NumericRangeQuery;
082 import org.apache.lucene.search.Query;
083 import org.apache.lucene.search.TermQuery;
084 import org.apache.lucene.search.TermRangeQuery;
085 import org.apache.lucene.search.WildcardQuery;
086 import org.apache.lucene.search.highlight.Highlighter;
087 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
088 import org.apache.lucene.search.highlight.QueryScorer;
089 import org.apache.lucene.search.highlight.SimpleFragmenter;
090 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
091 import org.apache.lucene.search.highlight.WeightedTerm;
092 import org.apache.lucene.util.Version;
093
094
102 public class LuceneHelperImpl implements LuceneHelper {
103
104 public void addDocument(long companyId, Document document)
105 throws IOException {
106
107 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
108
109 indexAccessor.addDocument(document);
110 }
111
112 public void addExactTerm(
113 BooleanQuery booleanQuery, String field, String value) {
114
115 addTerm(booleanQuery, field, value, false);
116 }
117
118 public void addNumericRangeTerm(
119 BooleanQuery booleanQuery, String field, String startValue,
120 String endValue) {
121
122 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
123 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
124 true, true);
125
126 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
127 }
128
129 public void addRangeTerm(
130 BooleanQuery booleanQuery, String field, String startValue,
131 String endValue) {
132
133 boolean includesLower = true;
134
135 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
136 includesLower = false;
137 }
138
139 boolean includesUpper = true;
140
141 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
142 includesUpper = false;
143 }
144
145 TermRangeQuery termRangeQuery = new TermRangeQuery(
146 field, startValue, endValue, includesLower, includesUpper);
147
148 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
149 }
150
151 public void addRequiredTerm(
152 BooleanQuery booleanQuery, String field, String value, boolean like) {
153
154 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
155 }
156
157 public void addRequiredTerm(
158 BooleanQuery booleanQuery, String field, String[] values,
159 boolean like) {
160
161 if (values == null) {
162 return;
163 }
164
165 BooleanQuery query = new BooleanQuery();
166
167 for (String value : values) {
168 addTerm(query, field, value, like);
169 }
170
171 booleanQuery.add(query, BooleanClause.Occur.MUST);
172 }
173
174 public void addTerm(
175 BooleanQuery booleanQuery, String field, String value, boolean like) {
176
177 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
178 }
179
180 public void addTerm(
181 BooleanQuery booleanQuery, String field, String value, boolean like,
182 BooleanClauseOccur booleanClauseOccur) {
183
184 if (Validator.isNull(value)) {
185 return;
186 }
187
188 Analyzer analyzer = getAnalyzer();
189
190 if (analyzer instanceof PerFieldAnalyzerWrapper) {
191 PerFieldAnalyzerWrapper perFieldAnalyzerWrapper =
192 (PerFieldAnalyzerWrapper)analyzer;
193
194 Analyzer fieldAnalyzer = perFieldAnalyzerWrapper.getAnalyzer(field);
195
196 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
197 like = true;
198 }
199 }
200
201 if (like) {
202 value = StringUtil.replace(
203 value, StringPool.PERCENT, StringPool.BLANK);
204 }
205
206 try {
207 QueryParser queryParser = new QueryParser(
208 getVersion(), field, analyzer);
209
210 Query query = null;
211
212 try {
213 query = queryParser.parse(value);
214 }
215 catch (Exception e) {
216 query = queryParser.parse(KeywordsUtil.escape(value));
217 }
218
219 BooleanClause.Occur occur = null;
220
221 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
222 occur = BooleanClause.Occur.MUST;
223 }
224 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
225 occur = BooleanClause.Occur.MUST_NOT;
226 }
227 else {
228 occur = BooleanClause.Occur.SHOULD;
229 }
230
231 _includeIfUnique(booleanQuery, query, occur, like);
232 }
233 catch (Exception e) {
234 _log.error(e, e);
235 }
236 }
237
238 public void addTerm(
239 BooleanQuery booleanQuery, String field, String[] values,
240 boolean like) {
241
242 for (String value : values) {
243 addTerm(booleanQuery, field, value, like);
244 }
245 }
246
247 public int countScoredFieldNames(Query query, String[] filedNames) {
248 int count = 0;
249
250 for (String fieldName : filedNames) {
251 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
252 query, false, fieldName);
253
254 if ((weightedTerms.length > 0) &&
255 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
256
257 count++;
258 }
259 }
260
261 return count;
262 }
263
264 public void delete(long companyId) {
265 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
266
267 if (indexAccessor == null) {
268 return;
269 }
270
271 indexAccessor.delete();
272 }
273
274 public void deleteDocuments(long companyId, Term term) throws IOException {
275 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
276
277 if (indexAccessor == null) {
278 return;
279 }
280
281 indexAccessor.deleteDocuments(term);
282 }
283
284 public void dumpIndex(long companyId, OutputStream outputStream)
285 throws IOException {
286
287 long lastGeneration = getLastGeneration(companyId);
288
289 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
290 if (_log.isDebugEnabled()) {
291 _log.debug(
292 "Dump index from cluster is not enabled for " + companyId);
293 }
294
295 return;
296 }
297
298 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
299
300 if (indexAccessor == null) {
301 return;
302 }
303
304 indexAccessor.dumpIndex(outputStream);
305 }
306
307 public Analyzer getAnalyzer() {
308 return _analyzer;
309 }
310
311 public long getLastGeneration(long companyId) {
312 if (!isLoadIndexFromClusterEnabled()) {
313 return IndexAccessor.DEFAULT_LAST_GENERATION;
314 }
315
316 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
317
318 if (indexAccessor == null) {
319 return IndexAccessor.DEFAULT_LAST_GENERATION;
320 }
321
322 return indexAccessor.getLastGeneration();
323 }
324
325 public InputStream getLoadIndexesInputStreamFromCluster(
326 long companyId, Address bootupAddress)
327 throws SystemException {
328
329 if (!isLoadIndexFromClusterEnabled()) {
330 return null;
331 }
332
333 InputStream inputStream = null;
334
335 try {
336 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
337 _getBootupClusterNodeObjectValuePair(bootupAddress);
338
339 URL url = bootupClusterNodeObjectValuePair.getValue();
340
341 URLConnection urlConnection = url.openConnection();
342
343 urlConnection.setDoOutput(true);
344
345 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
346 urlConnection.getOutputStream());
347
348 unsyncPrintWriter.write("transientToken=");
349 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
350 unsyncPrintWriter.write("&companyId=");
351 unsyncPrintWriter.write(String.valueOf(companyId));
352
353 unsyncPrintWriter.close();
354
355 inputStream = urlConnection.getInputStream();
356
357 return inputStream;
358 }
359 catch (IOException ioe) {
360 throw new SystemException(ioe);
361 }
362 }
363
364 public String[] getQueryTerms(Query query) {
365 String queryString = StringUtil.replace(
366 query.toString(), StringPool.STAR, StringPool.BLANK);
367
368 Query tempQuery = null;
369
370 try {
371 QueryParser queryParser = new QueryParser(
372 getVersion(), StringPool.BLANK, getAnalyzer());
373
374 tempQuery = queryParser.parse(queryString);
375 }
376 catch (Exception e) {
377 if (_log.isWarnEnabled()) {
378 _log.warn("Unable to parse " + queryString);
379 }
380
381 tempQuery = query;
382 }
383
384 WeightedTerm[] weightedTerms = null;
385
386 for (String fieldName : Field.KEYWORDS) {
387 weightedTerms = QueryTermExtractor.getTerms(
388 tempQuery, false, fieldName);
389
390 if (weightedTerms.length > 0) {
391 break;
392 }
393 }
394
395 Set<String> queryTerms = new HashSet<String>();
396
397 for (WeightedTerm weightedTerm : weightedTerms) {
398 queryTerms.add(weightedTerm.getTerm());
399 }
400
401 return queryTerms.toArray(new String[queryTerms.size()]);
402 }
403
404 public IndexSearcher getSearcher(long companyId, boolean readOnly)
405 throws IOException {
406
407 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
408
409 IndexSearcher indexSearcher = new IndexSearcher(
410 indexAccessor.getLuceneDir(), readOnly);
411
412 indexSearcher.setDefaultFieldSortScoring(true, true);
413 indexSearcher.setSimilarity(new FieldWeightSimilarity());
414
415 return indexSearcher;
416 }
417
418 public String getSnippet(
419 Query query, String field, String s, int maxNumFragments,
420 int fragmentLength, String fragmentSuffix, String preTag,
421 String postTag)
422 throws IOException {
423
424 SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
425 preTag, postTag);
426
427 QueryScorer queryScorer = new QueryScorer(query, field);
428
429 Highlighter highlighter = new Highlighter(
430 simpleHTMLFormatter, queryScorer);
431
432 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
433
434 TokenStream tokenStream = getAnalyzer().tokenStream(
435 field, new UnsyncStringReader(s));
436
437 try {
438 String snippet = highlighter.getBestFragments(
439 tokenStream, s, maxNumFragments, fragmentSuffix);
440
441 if (Validator.isNotNull(snippet) &&
442 !StringUtil.endsWith(snippet, fragmentSuffix)) {
443
444 snippet = snippet.concat(fragmentSuffix);
445 }
446
447 return snippet;
448 }
449 catch (InvalidTokenOffsetsException itoe) {
450 throw new IOException(itoe.getMessage());
451 }
452 }
453
454 public Version getVersion() {
455 return _version;
456 }
457
458 public boolean isLoadIndexFromClusterEnabled() {
459 if (PropsValues.CLUSTER_LINK_ENABLED &&
460 PropsValues.LUCENE_REPLICATE_WRITE) {
461
462 return true;
463 }
464
465 if (_log.isDebugEnabled()) {
466 _log.debug("Load index from cluster is not enabled");
467 }
468
469 return false;
470 }
471
472 public void loadIndex(long companyId, InputStream inputStream)
473 throws IOException {
474
475 if (!isLoadIndexFromClusterEnabled()) {
476 return;
477 }
478
479 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
480
481 if (indexAccessor == null) {
482 if (_log.isInfoEnabled()) {
483 _log.info(
484 "Skip loading Lucene index files for company " + companyId +
485 " in favor of lazy loading");
486 }
487
488 return;
489 }
490
491 StopWatch stopWatch = null;
492
493 if (_log.isInfoEnabled()) {
494 _log.info(
495 "Start loading Lucene index files for company " + companyId);
496
497 stopWatch = new StopWatch();
498
499 stopWatch.start();
500 }
501
502 indexAccessor.loadIndex(inputStream);
503
504 if (_log.isInfoEnabled()) {
505 _log.info(
506 "Finished loading index files for company " + companyId +
507 " in " + stopWatch.getTime() + " ms");
508 }
509 }
510
511 public void loadIndexesFromCluster(long companyId) throws SystemException {
512 if (!isLoadIndexFromClusterEnabled()) {
513 return;
514 }
515
516 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
517
518 if (indexAccessor == null) {
519 return;
520 }
521
522 long localLastGeneration = getLastGeneration(companyId);
523
524 _loadIndexFromCluster(indexAccessor, localLastGeneration);
525 }
526
527 public void setAnalyzer(Analyzer analyzer) {
528 _analyzer = analyzer;
529 }
530
531 public void setVersion(Version version) {
532 _version = version;
533 }
534
535 public void shutdown() {
536 if (_luceneIndexThreadPoolExecutor != null) {
537 _luceneIndexThreadPoolExecutor.shutdownNow();
538
539 try {
540 _luceneIndexThreadPoolExecutor.awaitTermination(
541 60, TimeUnit.SECONDS);
542 }
543 catch (InterruptedException ie) {
544 _log.error("Lucene indexer shutdown interrupted", ie);
545 }
546 }
547
548 if (isLoadIndexFromClusterEnabled()) {
549 ClusterExecutorUtil.removeClusterEventListener(
550 _loadIndexClusterEventListener);
551 }
552
553 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
554 indexAccessor.close();
555 }
556 }
557
558 public void startup(long companyId) {
559 if (PropsValues.INDEX_ON_STARTUP) {
560 if (_log.isInfoEnabled()) {
561 _log.info("Indexing Lucene on startup");
562 }
563
564 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
565
566 if (PropsValues.INDEX_WITH_THREAD) {
567 if (_luceneIndexThreadPoolExecutor == null) {
568
569
570
571
572
573 _luceneIndexThreadPoolExecutor =
574 PortalExecutorManagerUtil.getPortalExecutor(
575 LuceneHelperImpl.class.getName());
576 }
577
578 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
579 }
580 else {
581 luceneIndexer.reindex();
582 }
583 }
584 }
585
586 public void updateDocument(long companyId, Term term, Document document)
587 throws IOException {
588
589 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
590
591 indexAccessor.updateDocument(term, document);
592 }
593
594 private LuceneHelperImpl() {
595 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
596 _luceneIndexThreadPoolExecutor =
597 PortalExecutorManagerUtil.getPortalExecutor(
598 LuceneHelperImpl.class.getName());
599 }
600
601 if (isLoadIndexFromClusterEnabled()) {
602 _loadIndexClusterEventListener =
603 new LoadIndexClusterEventListener();
604
605 ClusterExecutorUtil.addClusterEventListener(
606 _loadIndexClusterEventListener);
607 }
608 }
609
610 private ObjectValuePair<String, URL>
611 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
612 throws SystemException {
613
614 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
615 new MethodHandler(
616 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
617 bootupAddress);
618
619 FutureClusterResponses futureClusterResponses =
620 ClusterExecutorUtil.execute(clusterRequest);
621
622 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
623 futureClusterResponses.getPartialResults();
624
625 try {
626 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
627 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
628
629 String transientToken = (String)clusterNodeResponse.getResult();
630
631 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
632
633 InetAddress inetAddress = clusterNode.getInetAddress();
634
635 URL url = new URL(
636 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
637 "/lucene/dump");
638
639 return new ObjectValuePair<String, URL>(transientToken, url);
640 }
641 catch (Exception e) {
642 throw new SystemException(e);
643 }
644 }
645
646 private IndexAccessor _getIndexAccessor(long companyId) {
647 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
648
649 if (indexAccessor != null) {
650 return indexAccessor;
651 }
652
653 synchronized (this) {
654 indexAccessor = _indexAccessors.get(companyId);
655
656 if (indexAccessor == null) {
657 indexAccessor = new IndexAccessorImpl(companyId);
658
659 if (isLoadIndexFromClusterEnabled()) {
660 boolean clusterForwardMessage = GetterUtil.getBoolean(
661 MessageValuesThreadLocal.getValue(
662 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE));
663
664 if (clusterForwardMessage) {
665 if (_log.isInfoEnabled()) {
666 _log.info(
667 "Skip Luncene index files cluster loading " +
668 "since this is a manual reindex request");
669 }
670 }
671 else {
672 indexAccessor = new SynchronizedIndexAccessorImpl(
673 indexAccessor);
674
675 try {
676 _loadIndexFromCluster(
677 indexAccessor,
678 indexAccessor.getLastGeneration());
679 }
680 catch (Exception e) {
681 _log.error(
682 "Unable to load index for company " +
683 indexAccessor.getCompanyId(),
684 e);
685 }
686 }
687 }
688
689 _indexAccessors.put(companyId, indexAccessor);
690 }
691 }
692
693 return indexAccessor;
694 }
695
696 private void _includeIfUnique(
697 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
698 boolean like) {
699
700 if (query instanceof TermQuery) {
701 Set<Term> terms = new HashSet<Term>();
702
703 query.extractTerms(terms);
704
705 for (Term term : terms) {
706 String termValue = term.text();
707
708 if (like) {
709 term = term.createTerm(
710 StringPool.STAR.concat(termValue).concat(
711 StringPool.STAR));
712
713 query = new WildcardQuery(term);
714 }
715 else {
716 query = new TermQuery(term);
717 }
718
719 boolean included = false;
720
721 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
722 if (query.equals(booleanClause.getQuery())) {
723 included = true;
724 }
725 }
726
727 if (!included) {
728 booleanQuery.add(query, occur);
729 }
730 }
731 }
732 else if (query instanceof BooleanQuery) {
733 BooleanQuery curBooleanQuery = (BooleanQuery)query;
734
735 BooleanQuery containerBooleanQuery = new BooleanQuery();
736
737 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
738 _includeIfUnique(
739 containerBooleanQuery, booleanClause.getQuery(),
740 booleanClause.getOccur(), like);
741 }
742
743 if (containerBooleanQuery.getClauses().length > 0) {
744 booleanQuery.add(containerBooleanQuery, occur);
745 }
746 }
747 else {
748 boolean included = false;
749
750 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
751 if (query.equals(booleanClause.getQuery())) {
752 included = true;
753 }
754 }
755
756 if (!included) {
757 booleanQuery.add(query, occur);
758 }
759 }
760 }
761
762 private void _loadIndexFromCluster(
763 IndexAccessor indexAccessor, long localLastGeneration)
764 throws SystemException {
765
766 List<Address> clusterNodeAddresses =
767 ClusterExecutorUtil.getClusterNodeAddresses();
768
769 int clusterNodeAddressesCount = clusterNodeAddresses.size();
770
771 if (clusterNodeAddressesCount <= 1) {
772 if (_log.isDebugEnabled()) {
773 _log.debug(
774 "Do not load indexes because there is either one portal " +
775 "instance or no portal instances in the cluster");
776 }
777
778 return;
779 }
780
781 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
782 new MethodHandler(
783 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
784 true);
785
786 ClusterExecutorUtil.execute(
787 clusterRequest,
788 new LoadIndexClusterResponseCallback(
789 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
790 }
791
792 private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
793
794 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
795
796 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
797
798 private static MethodKey _createTokenMethodKey =
799 new MethodKey(TransientTokenUtil.class.getName(), "createToken",
800 long.class);
801 private static MethodKey _getLastGenerationMethodKey =
802 new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
803 long.class);
804
805 private Analyzer _analyzer;
806 private Map<Long, IndexAccessor> _indexAccessors =
807 new ConcurrentHashMap<Long, IndexAccessor>();
808 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
809 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
810 private Version _version;
811
812 private class LoadIndexClusterEventListener
813 implements ClusterEventListener {
814
815 public void processClusterEvent(ClusterEvent clusterEvent) {
816 ClusterEventType clusterEventType =
817 clusterEvent.getClusterEventType();
818
819 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
820 return;
821 }
822
823 List<Address> clusterNodeAddresses =
824 ClusterExecutorUtil.getClusterNodeAddresses();
825 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
826
827 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
828 if (_log.isDebugEnabled()) {
829 _log.debug(
830 "Number of original cluster members is greater than " +
831 "one");
832 }
833
834 return;
835 }
836
837 long[] companyIds = PortalInstances.getCompanyIds();
838
839 for (long companyId : companyIds) {
840 loadIndexes(companyId);
841 }
842
843 loadIndexes(CompanyConstants.SYSTEM);
844 }
845
846 private void loadIndexes(long companyId) {
847 long lastGeneration = getLastGeneration(companyId);
848
849 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
850 return;
851 }
852
853 try {
854 LuceneClusterUtil.loadIndexesFromCluster(companyId);
855 }
856 catch (Exception e) {
857 _log.error(
858 "Unable to load indexes for company " + companyId, e);
859 }
860 }
861
862 }
863
864 private class LoadIndexClusterResponseCallback
865 extends BaseClusterResponseCallback {
866
867 public LoadIndexClusterResponseCallback(
868 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
869 long localLastGeneration) {
870
871 _indexAccessor = indexAccessor;
872 _clusterNodeAddressesCount = clusterNodeAddressesCount;
873 _localLastGeneration = localLastGeneration;
874
875 _companyId = _indexAccessor.getCompanyId();
876 }
877
878 @Override
879 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
880 Address bootupAddress = null;
881
882 do {
883 _clusterNodeAddressesCount--;
884
885 ClusterNodeResponse clusterNodeResponse = null;
886
887 try {
888 clusterNodeResponse = blockingQueue.poll(
889 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
890 TimeUnit.MILLISECONDS);
891 }
892 catch (Exception e) {
893 _log.error("Unable to get cluster node response", e);
894 }
895
896 if (clusterNodeResponse == null) {
897 if (_log.isDebugEnabled()) {
898 _log.debug(
899 "Unable to get cluster node response in " +
900 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
901 TimeUnit.MILLISECONDS);
902 }
903
904 continue;
905 }
906
907 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
908
909 if (clusterNode.getPort() > 0) {
910 try {
911 long remoteLastGeneration =
912 (Long)clusterNodeResponse.getResult();
913
914 if (remoteLastGeneration > _localLastGeneration) {
915 bootupAddress = clusterNodeResponse.getAddress();
916
917 break;
918 }
919 }
920 catch (Exception e) {
921 if (_log.isDebugEnabled()) {
922 _log.debug(
923 "Suppress exception caused by remote method " +
924 "invocation",
925 e);
926 }
927
928 continue;
929 }
930 }
931 else {
932 if (_log.isDebugEnabled()) {
933 _log.debug(
934 "Cluster node " + clusterNode +
935 " has invalid port");
936 }
937 }
938 }
939 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
940
941 if (bootupAddress == null) {
942 return;
943 }
944
945 if (_log.isInfoEnabled()) {
946 _log.info(
947 "Start loading lucene index files from cluster node " +
948 bootupAddress);
949 }
950
951 InputStream inputStream = null;
952
953 try {
954 inputStream = getLoadIndexesInputStreamFromCluster(
955 _companyId, bootupAddress);
956
957 _indexAccessor.loadIndex(inputStream);
958
959 if (_log.isInfoEnabled()) {
960 _log.info("Lucene index files loaded successfully");
961 }
962 }
963 catch (Exception e) {
964 _log.error("Unable to load index for company " + _companyId, e);
965 }
966 finally {
967 if (inputStream != null) {
968 try {
969 inputStream.close();
970 }
971 catch (IOException ioe) {
972 _log.error(
973 "Unable to close input stream for company " +
974 _companyId,
975 ioe);
976 }
977 }
978 }
979 }
980
981 @Override
982 public void processTimeoutException(TimeoutException timeoutException) {
983 _log.error(
984 "Uanble to load index for company " + _companyId,
985 timeoutException);
986 }
987
988 private int _clusterNodeAddressesCount;
989 private long _companyId;
990 private IndexAccessor _indexAccessor;
991 private long _localLastGeneration;
992
993 }
994
995 }