001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterEventType;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNode;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036 import com.liferay.portal.kernel.search.BooleanClauseOccur;
037 import com.liferay.portal.kernel.search.Field;
038 import com.liferay.portal.kernel.util.ArrayUtil;
039 import com.liferay.portal.kernel.util.GetterUtil;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.StringPool;
044 import com.liferay.portal.kernel.util.StringUtil;
045 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
046 import com.liferay.portal.kernel.util.Validator;
047 import com.liferay.portal.model.CompanyConstants;
048 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
049 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
050 import com.liferay.portal.security.auth.TransientTokenUtil;
051 import com.liferay.portal.util.PortalInstances;
052 import com.liferay.portal.util.PropsValues;
053 import com.liferay.util.lucene.KeywordsUtil;
054
055 import java.io.IOException;
056 import java.io.InputStream;
057 import java.io.OutputStream;
058
059 import java.net.InetAddress;
060 import java.net.URL;
061 import java.net.URLConnection;
062
063 import java.util.HashSet;
064 import java.util.List;
065 import java.util.Map;
066 import java.util.Set;
067 import java.util.concurrent.BlockingQueue;
068 import java.util.concurrent.ConcurrentHashMap;
069 import java.util.concurrent.TimeUnit;
070 import java.util.concurrent.TimeoutException;
071
072 import org.apache.commons.lang.time.StopWatch;
073 import org.apache.lucene.analysis.Analyzer;
074 import org.apache.lucene.analysis.TokenStream;
075 import org.apache.lucene.document.Document;
076 import org.apache.lucene.index.IndexReader;
077 import org.apache.lucene.index.Term;
078 import org.apache.lucene.queryParser.QueryParser;
079 import org.apache.lucene.search.BooleanClause;
080 import org.apache.lucene.search.BooleanQuery;
081 import org.apache.lucene.search.IndexSearcher;
082 import org.apache.lucene.search.NumericRangeQuery;
083 import org.apache.lucene.search.Query;
084 import org.apache.lucene.search.TermQuery;
085 import org.apache.lucene.search.TermRangeQuery;
086 import org.apache.lucene.search.WildcardQuery;
087 import org.apache.lucene.search.highlight.Formatter;
088 import org.apache.lucene.search.highlight.Highlighter;
089 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
090 import org.apache.lucene.search.highlight.QueryScorer;
091 import org.apache.lucene.search.highlight.SimpleFragmenter;
092 import org.apache.lucene.search.highlight.WeightedTerm;
093 import org.apache.lucene.util.Version;
094
095
103 public class LuceneHelperImpl implements LuceneHelper {
104
105 public void addDocument(long companyId, Document document)
106 throws IOException {
107
108 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
109
110 indexAccessor.addDocument(document);
111 }
112
113 public void addExactTerm(
114 BooleanQuery booleanQuery, String field, String value) {
115
116 addTerm(booleanQuery, field, value, false);
117 }
118
119 public void addNumericRangeTerm(
120 BooleanQuery booleanQuery, String field, String startValue,
121 String endValue) {
122
123 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
124 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
125 true, true);
126
127 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
128 }
129
130 public void addRangeTerm(
131 BooleanQuery booleanQuery, String field, String startValue,
132 String endValue) {
133
134 boolean includesLower = true;
135
136 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
137 includesLower = false;
138 }
139
140 boolean includesUpper = true;
141
142 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
143 includesUpper = false;
144 }
145
146 TermRangeQuery termRangeQuery = new TermRangeQuery(
147 field, startValue, endValue, includesLower, includesUpper);
148
149 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
150 }
151
152 public void addRequiredTerm(
153 BooleanQuery booleanQuery, String field, String value, boolean like) {
154
155 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
156 }
157
158 public void addRequiredTerm(
159 BooleanQuery booleanQuery, String field, String[] values,
160 boolean like) {
161
162 if (values == null) {
163 return;
164 }
165
166 BooleanQuery query = new BooleanQuery();
167
168 for (String value : values) {
169 addTerm(query, field, value, like);
170 }
171
172 booleanQuery.add(query, BooleanClause.Occur.MUST);
173 }
174
175 public void addTerm(
176 BooleanQuery booleanQuery, String field, String value, boolean like) {
177
178 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
179 }
180
181 public void addTerm(
182 BooleanQuery booleanQuery, String field, String value, boolean like,
183 BooleanClauseOccur booleanClauseOccur) {
184
185 if (Validator.isNull(value)) {
186 return;
187 }
188
189 Analyzer analyzer = getAnalyzer();
190
191 if (analyzer instanceof PerFieldAnalyzer) {
192 PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
193
194 Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
195
196 if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
197 like = true;
198 }
199 }
200
201 if (like) {
202 value = StringUtil.replace(
203 value, StringPool.PERCENT, StringPool.BLANK);
204 }
205
206 try {
207 QueryParser queryParser = new QueryParser(
208 getVersion(), field, analyzer);
209
210 Query query = null;
211
212 try {
213 if (like) {
214
215
216
217 value = value.toLowerCase(queryParser.getLocale());
218 }
219
220 query = queryParser.parse(value);
221 }
222 catch (Exception e) {
223 query = queryParser.parse(KeywordsUtil.escape(value));
224 }
225
226 BooleanClause.Occur occur = null;
227
228 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
229 occur = BooleanClause.Occur.MUST;
230 }
231 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
232 occur = BooleanClause.Occur.MUST_NOT;
233 }
234 else {
235 occur = BooleanClause.Occur.SHOULD;
236 }
237
238 _includeIfUnique(booleanQuery, query, occur, like);
239 }
240 catch (Exception e) {
241 _log.error(e, e);
242 }
243 }
244
245 public void addTerm(
246 BooleanQuery booleanQuery, String field, String[] values,
247 boolean like) {
248
249 for (String value : values) {
250 addTerm(booleanQuery, field, value, like);
251 }
252 }
253
254 public int countScoredFieldNames(Query query, String[] filedNames) {
255 int count = 0;
256
257 for (String fieldName : filedNames) {
258 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
259 query, false, fieldName);
260
261 if ((weightedTerms.length > 0) &&
262 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
263
264 count++;
265 }
266 }
267
268 return count;
269 }
270
271 public void delete(long companyId) {
272 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
273
274 if (indexAccessor == null) {
275 return;
276 }
277
278 indexAccessor.delete();
279 }
280
281 public void deleteDocuments(long companyId, Term term) throws IOException {
282 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
283
284 if (indexAccessor == null) {
285 return;
286 }
287
288 indexAccessor.deleteDocuments(term);
289 }
290
291 public void dumpIndex(long companyId, OutputStream outputStream)
292 throws IOException {
293
294 long lastGeneration = getLastGeneration(companyId);
295
296 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
297 if (_log.isDebugEnabled()) {
298 _log.debug(
299 "Dump index from cluster is not enabled for " + companyId);
300 }
301
302 return;
303 }
304
305 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
306
307 if (indexAccessor == null) {
308 return;
309 }
310
311 indexAccessor.dumpIndex(outputStream);
312 }
313
314 public Analyzer getAnalyzer() {
315 return _analyzer;
316 }
317
318 public long getLastGeneration(long companyId) {
319 if (!isLoadIndexFromClusterEnabled()) {
320 return IndexAccessor.DEFAULT_LAST_GENERATION;
321 }
322
323 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
324
325 if (indexAccessor == null) {
326 return IndexAccessor.DEFAULT_LAST_GENERATION;
327 }
328
329 return indexAccessor.getLastGeneration();
330 }
331
332 public InputStream getLoadIndexesInputStreamFromCluster(
333 long companyId, Address bootupAddress)
334 throws SystemException {
335
336 if (!isLoadIndexFromClusterEnabled()) {
337 return null;
338 }
339
340 InputStream inputStream = null;
341
342 try {
343 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
344 _getBootupClusterNodeObjectValuePair(bootupAddress);
345
346 URL url = bootupClusterNodeObjectValuePair.getValue();
347
348 URLConnection urlConnection = url.openConnection();
349
350 urlConnection.setDoOutput(true);
351
352 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
353 urlConnection.getOutputStream());
354
355 unsyncPrintWriter.write("transientToken=");
356 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
357 unsyncPrintWriter.write("&companyId=");
358 unsyncPrintWriter.write(String.valueOf(companyId));
359
360 unsyncPrintWriter.close();
361
362 inputStream = urlConnection.getInputStream();
363
364 return inputStream;
365 }
366 catch (IOException ioe) {
367 throw new SystemException(ioe);
368 }
369 }
370
371 public Set<String> getQueryTerms(Query query) {
372 String queryString = StringUtil.replace(
373 query.toString(), StringPool.STAR, StringPool.BLANK);
374
375 Query tempQuery = null;
376
377 try {
378 QueryParser queryParser = new QueryParser(
379 getVersion(), StringPool.BLANK, getAnalyzer());
380
381 tempQuery = queryParser.parse(queryString);
382 }
383 catch (Exception e) {
384 if (_log.isWarnEnabled()) {
385 _log.warn("Unable to parse " + queryString);
386 }
387
388 tempQuery = query;
389 }
390
391 WeightedTerm[] weightedTerms = null;
392
393 for (String fieldName : Field.KEYWORDS) {
394 weightedTerms = QueryTermExtractor.getTerms(
395 tempQuery, false, fieldName);
396
397 if (weightedTerms.length > 0) {
398 break;
399 }
400 }
401
402 Set<String> queryTerms = new HashSet<String>();
403
404 for (WeightedTerm weightedTerm : weightedTerms) {
405 queryTerms.add(weightedTerm.getTerm());
406 }
407
408 return queryTerms;
409 }
410
411 public IndexSearcher getSearcher(long companyId, boolean readOnly)
412 throws IOException {
413
414 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
415
416 IndexReader indexReader = IndexReader.open(
417 indexAccessor.getLuceneDir(), readOnly);
418
419 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
420
421 indexSearcher.setDefaultFieldSortScoring(true, true);
422 indexSearcher.setSimilarity(new FieldWeightSimilarity());
423
424 return indexSearcher;
425 }
426
427 public String getSnippet(
428 Query query, String field, String s, int maxNumFragments,
429 int fragmentLength, String fragmentSuffix, Formatter formatter)
430 throws IOException {
431
432 QueryScorer queryScorer = new QueryScorer(query, field);
433
434 Highlighter highlighter = new Highlighter(formatter, queryScorer);
435
436 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
437
438 TokenStream tokenStream = getAnalyzer().tokenStream(
439 field, new UnsyncStringReader(s));
440
441 try {
442 String snippet = highlighter.getBestFragments(
443 tokenStream, s, maxNumFragments, fragmentSuffix);
444
445 if (Validator.isNotNull(snippet) &&
446 !StringUtil.endsWith(snippet, fragmentSuffix)) {
447
448 snippet = snippet.concat(fragmentSuffix);
449 }
450
451 return snippet;
452 }
453 catch (InvalidTokenOffsetsException itoe) {
454 throw new IOException(itoe.getMessage());
455 }
456 }
457
458 public Version getVersion() {
459 return _version;
460 }
461
462 public boolean isLoadIndexFromClusterEnabled() {
463 if (PropsValues.CLUSTER_LINK_ENABLED &&
464 PropsValues.LUCENE_REPLICATE_WRITE) {
465
466 return true;
467 }
468
469 if (_log.isDebugEnabled()) {
470 _log.debug("Load index from cluster is not enabled");
471 }
472
473 return false;
474 }
475
476 public void loadIndex(long companyId, InputStream inputStream)
477 throws IOException {
478
479 if (!isLoadIndexFromClusterEnabled()) {
480 return;
481 }
482
483 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
484
485 if (indexAccessor == null) {
486 if (_log.isInfoEnabled()) {
487 _log.info(
488 "Skip loading Lucene index files for company " + companyId +
489 " in favor of lazy loading");
490 }
491
492 return;
493 }
494
495 StopWatch stopWatch = null;
496
497 if (_log.isInfoEnabled()) {
498 _log.info(
499 "Start loading Lucene index files for company " + companyId);
500
501 stopWatch = new StopWatch();
502
503 stopWatch.start();
504 }
505
506 indexAccessor.loadIndex(inputStream);
507
508 if (_log.isInfoEnabled()) {
509 _log.info(
510 "Finished loading index files for company " + companyId +
511 " in " + stopWatch.getTime() + " ms");
512 }
513 }
514
515 public void loadIndexesFromCluster(long companyId) throws SystemException {
516 if (!isLoadIndexFromClusterEnabled()) {
517 return;
518 }
519
520 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
521
522 if (indexAccessor == null) {
523 return;
524 }
525
526 long localLastGeneration = getLastGeneration(companyId);
527
528 _loadIndexFromCluster(indexAccessor, localLastGeneration);
529 }
530
531 public void setAnalyzer(Analyzer analyzer) {
532 _analyzer = analyzer;
533 }
534
535 public void setVersion(Version version) {
536 _version = version;
537 }
538
539 public void shutdown() {
540 if (_luceneIndexThreadPoolExecutor != null) {
541 _luceneIndexThreadPoolExecutor.shutdownNow();
542
543 try {
544 _luceneIndexThreadPoolExecutor.awaitTermination(
545 60, TimeUnit.SECONDS);
546 }
547 catch (InterruptedException ie) {
548 _log.error("Lucene indexer shutdown interrupted", ie);
549 }
550 }
551
552 if (isLoadIndexFromClusterEnabled()) {
553 ClusterExecutorUtil.removeClusterEventListener(
554 _loadIndexClusterEventListener);
555 }
556
557 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
558 indexAccessor.close();
559 }
560 }
561
562 public void startup(long companyId) {
563 if (PropsValues.INDEX_ON_STARTUP) {
564 if (_log.isInfoEnabled()) {
565 _log.info("Indexing Lucene on startup");
566 }
567
568 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
569
570 if (PropsValues.INDEX_WITH_THREAD) {
571 if (_luceneIndexThreadPoolExecutor == null) {
572
573
574
575
576
577 _luceneIndexThreadPoolExecutor =
578 PortalExecutorManagerUtil.getPortalExecutor(
579 LuceneHelperImpl.class.getName());
580 }
581
582 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
583 }
584 else {
585 luceneIndexer.reindex();
586 }
587 }
588 }
589
590 public void updateDocument(long companyId, Term term, Document document)
591 throws IOException {
592
593 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
594
595 indexAccessor.updateDocument(term, document);
596 }
597
598 private LuceneHelperImpl() {
599 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
600 _luceneIndexThreadPoolExecutor =
601 PortalExecutorManagerUtil.getPortalExecutor(
602 LuceneHelperImpl.class.getName());
603 }
604
605 if (isLoadIndexFromClusterEnabled()) {
606 _loadIndexClusterEventListener =
607 new LoadIndexClusterEventListener();
608
609 ClusterExecutorUtil.addClusterEventListener(
610 _loadIndexClusterEventListener);
611 }
612 }
613
614 private ObjectValuePair<String, URL>
615 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
616 throws SystemException {
617
618 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
619 new MethodHandler(
620 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
621 bootupAddress);
622
623 FutureClusterResponses futureClusterResponses =
624 ClusterExecutorUtil.execute(clusterRequest);
625
626 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
627 futureClusterResponses.getPartialResults();
628
629 try {
630 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
631 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
632
633 String transientToken = (String)clusterNodeResponse.getResult();
634
635 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
636
637 InetAddress inetAddress = clusterNode.getInetAddress();
638
639 URL url = new URL(
640 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
641 "/lucene/dump");
642
643 return new ObjectValuePair<String, URL>(transientToken, url);
644 }
645 catch (Exception e) {
646 throw new SystemException(e);
647 }
648 }
649
650 private IndexAccessor _getIndexAccessor(long companyId) {
651 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
652
653 if (indexAccessor != null) {
654 return indexAccessor;
655 }
656
657 synchronized (this) {
658 indexAccessor = _indexAccessors.get(companyId);
659
660 if (indexAccessor == null) {
661 indexAccessor = new IndexAccessorImpl(companyId);
662
663 if (isLoadIndexFromClusterEnabled()) {
664 boolean clusterForwardMessage = GetterUtil.getBoolean(
665 MessageValuesThreadLocal.getValue(
666 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE));
667
668 if (clusterForwardMessage) {
669 if (_log.isInfoEnabled()) {
670 _log.info(
671 "Skip Luncene index files cluster loading " +
672 "since this is a manual reindex request");
673 }
674 }
675 else {
676 indexAccessor = new SynchronizedIndexAccessorImpl(
677 indexAccessor);
678
679 try {
680 _loadIndexFromCluster(
681 indexAccessor,
682 indexAccessor.getLastGeneration());
683 }
684 catch (Exception e) {
685 _log.error(
686 "Unable to load index for company " +
687 indexAccessor.getCompanyId(),
688 e);
689 }
690 }
691 }
692
693 _indexAccessors.put(companyId, indexAccessor);
694 }
695 }
696
697 return indexAccessor;
698 }
699
700 private void _includeIfUnique(
701 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
702 boolean like) {
703
704 if (query instanceof TermQuery) {
705 Set<Term> terms = new HashSet<Term>();
706
707 query.extractTerms(terms);
708
709 for (Term term : terms) {
710 String termValue = term.text();
711
712 if (like) {
713 term = term.createTerm(
714 StringPool.STAR.concat(termValue).concat(
715 StringPool.STAR));
716
717 query = new WildcardQuery(term);
718 }
719 else {
720 query = new TermQuery(term);
721 }
722
723 boolean included = false;
724
725 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
726 if (query.equals(booleanClause.getQuery())) {
727 included = true;
728 }
729 }
730
731 if (!included) {
732 booleanQuery.add(query, occur);
733 }
734 }
735 }
736 else if (query instanceof BooleanQuery) {
737 BooleanQuery curBooleanQuery = (BooleanQuery)query;
738
739 BooleanQuery containerBooleanQuery = new BooleanQuery();
740
741 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
742 _includeIfUnique(
743 containerBooleanQuery, booleanClause.getQuery(),
744 booleanClause.getOccur(), like);
745 }
746
747 if (containerBooleanQuery.getClauses().length > 0) {
748 booleanQuery.add(containerBooleanQuery, occur);
749 }
750 }
751 else {
752 boolean included = false;
753
754 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
755 if (query.equals(booleanClause.getQuery())) {
756 included = true;
757 }
758 }
759
760 if (!included) {
761 booleanQuery.add(query, occur);
762 }
763 }
764 }
765
766 private void _loadIndexFromCluster(
767 IndexAccessor indexAccessor, long localLastGeneration)
768 throws SystemException {
769
770 List<Address> clusterNodeAddresses =
771 ClusterExecutorUtil.getClusterNodeAddresses();
772
773 int clusterNodeAddressesCount = clusterNodeAddresses.size();
774
775 if (clusterNodeAddressesCount <= 1) {
776 if (_log.isDebugEnabled()) {
777 _log.debug(
778 "Do not load indexes because there is either one portal " +
779 "instance or no portal instances in the cluster");
780 }
781
782 return;
783 }
784
785 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
786 new MethodHandler(
787 _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
788 true);
789
790 ClusterExecutorUtil.execute(
791 clusterRequest,
792 new LoadIndexClusterResponseCallback(
793 indexAccessor, clusterNodeAddressesCount, localLastGeneration));
794 }
795
796 private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
797
798 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
799
800 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
801
802 private static MethodKey _createTokenMethodKey =
803 new MethodKey(TransientTokenUtil.class.getName(), "createToken",
804 long.class);
805 private static MethodKey _getLastGenerationMethodKey =
806 new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
807 long.class);
808
809 private Analyzer _analyzer;
810 private Map<Long, IndexAccessor> _indexAccessors =
811 new ConcurrentHashMap<Long, IndexAccessor>();
812 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
813 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
814 private Version _version;
815
816 private class LoadIndexClusterEventListener
817 implements ClusterEventListener {
818
819 public void processClusterEvent(ClusterEvent clusterEvent) {
820 ClusterEventType clusterEventType =
821 clusterEvent.getClusterEventType();
822
823 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
824 return;
825 }
826
827 List<Address> clusterNodeAddresses =
828 ClusterExecutorUtil.getClusterNodeAddresses();
829 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
830
831 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
832 if (_log.isDebugEnabled()) {
833 _log.debug(
834 "Number of original cluster members is greater than " +
835 "one");
836 }
837
838 return;
839 }
840
841 long[] companyIds = PortalInstances.getCompanyIds();
842
843 for (long companyId : companyIds) {
844 loadIndexes(companyId);
845 }
846
847 loadIndexes(CompanyConstants.SYSTEM);
848 }
849
850 private void loadIndexes(long companyId) {
851 long lastGeneration = getLastGeneration(companyId);
852
853 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
854 return;
855 }
856
857 try {
858 LuceneClusterUtil.loadIndexesFromCluster(companyId);
859 }
860 catch (Exception e) {
861 _log.error(
862 "Unable to load indexes for company " + companyId, e);
863 }
864 }
865
866 }
867
868 private class LoadIndexClusterResponseCallback
869 extends BaseClusterResponseCallback {
870
871 public LoadIndexClusterResponseCallback(
872 IndexAccessor indexAccessor, int clusterNodeAddressesCount,
873 long localLastGeneration) {
874
875 _indexAccessor = indexAccessor;
876 _clusterNodeAddressesCount = clusterNodeAddressesCount;
877 _localLastGeneration = localLastGeneration;
878
879 _companyId = _indexAccessor.getCompanyId();
880 }
881
882 @Override
883 public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
884 Address bootupAddress = null;
885
886 do {
887 _clusterNodeAddressesCount--;
888
889 ClusterNodeResponse clusterNodeResponse = null;
890
891 try {
892 clusterNodeResponse = blockingQueue.poll(
893 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
894 TimeUnit.MILLISECONDS);
895 }
896 catch (Exception e) {
897 _log.error("Unable to get cluster node response", e);
898 }
899
900 if (clusterNodeResponse == null) {
901 if (_log.isDebugEnabled()) {
902 _log.debug(
903 "Unable to get cluster node response in " +
904 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
905 TimeUnit.MILLISECONDS);
906 }
907
908 continue;
909 }
910
911 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
912
913 if (clusterNode.getPort() > 0) {
914 try {
915 long remoteLastGeneration =
916 (Long)clusterNodeResponse.getResult();
917
918 if (remoteLastGeneration > _localLastGeneration) {
919 bootupAddress = clusterNodeResponse.getAddress();
920
921 break;
922 }
923 }
924 catch (Exception e) {
925 if (_log.isDebugEnabled()) {
926 _log.debug(
927 "Suppress exception caused by remote method " +
928 "invocation",
929 e);
930 }
931
932 continue;
933 }
934 }
935 else {
936 if (_log.isDebugEnabled()) {
937 _log.debug(
938 "Cluster node " + clusterNode +
939 " has invalid port");
940 }
941 }
942 }
943 while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
944
945 if (bootupAddress == null) {
946 return;
947 }
948
949 if (_log.isInfoEnabled()) {
950 _log.info(
951 "Start loading lucene index files from cluster node " +
952 bootupAddress);
953 }
954
955 InputStream inputStream = null;
956
957 try {
958 inputStream = getLoadIndexesInputStreamFromCluster(
959 _companyId, bootupAddress);
960
961 _indexAccessor.loadIndex(inputStream);
962
963 if (_log.isInfoEnabled()) {
964 _log.info("Lucene index files loaded successfully");
965 }
966 }
967 catch (Exception e) {
968 _log.error("Unable to load index for company " + _companyId, e);
969 }
970 finally {
971 if (inputStream != null) {
972 try {
973 inputStream.close();
974 }
975 catch (IOException ioe) {
976 _log.error(
977 "Unable to close input stream for company " +
978 _companyId,
979 ioe);
980 }
981 }
982 }
983 }
984
985 @Override
986 public void processTimeoutException(TimeoutException timeoutException) {
987 _log.error(
988 "Uanble to load index for company " + _companyId,
989 timeoutException);
990 }
991
992 private int _clusterNodeAddressesCount;
993 private long _companyId;
994 private IndexAccessor _indexAccessor;
995 private long _localLastGeneration;
996
997 }
998
999 }