001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterNode;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
029 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
030 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEntry;
035 import com.liferay.portal.kernel.scheduler.SchedulerEntryImpl;
036 import com.liferay.portal.kernel.scheduler.StorageType;
037 import com.liferay.portal.kernel.scheduler.TimeUnit;
038 import com.liferay.portal.kernel.scheduler.TriggerType;
039 import com.liferay.portal.kernel.search.Field;
040 import com.liferay.portal.kernel.util.ArrayUtil;
041 import com.liferay.portal.kernel.util.GetterUtil;
042 import com.liferay.portal.kernel.util.MethodHandler;
043 import com.liferay.portal.kernel.util.MethodKey;
044 import com.liferay.portal.kernel.util.ObjectValuePair;
045 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
046 import com.liferay.portal.kernel.util.StringPool;
047 import com.liferay.portal.kernel.util.StringUtil;
048 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
049 import com.liferay.portal.kernel.util.Validator;
050 import com.liferay.portal.model.CompanyConstants;
051 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
052 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
053 import com.liferay.portal.search.lucene.messaging.CleanUpMessageListener;
054 import com.liferay.portal.security.auth.TransientTokenUtil;
055 import com.liferay.portal.util.PortalInstances;
056 import com.liferay.portal.util.PropsValues;
057
058 import java.io.IOException;
059 import java.io.InputStream;
060 import java.io.OutputStream;
061
062 import java.net.InetAddress;
063 import java.net.URL;
064 import java.net.URLConnection;
065
066 import java.util.HashSet;
067 import java.util.List;
068 import java.util.Map;
069 import java.util.Set;
070 import java.util.concurrent.BlockingQueue;
071 import java.util.concurrent.ConcurrentHashMap;
072
073 import org.apache.lucene.analysis.Analyzer;
074 import org.apache.lucene.analysis.TokenStream;
075 import org.apache.lucene.document.Document;
076 import org.apache.lucene.index.Term;
077 import org.apache.lucene.queryParser.QueryParser;
078 import org.apache.lucene.search.BooleanClause;
079 import org.apache.lucene.search.BooleanQuery;
080 import org.apache.lucene.search.IndexSearcher;
081 import org.apache.lucene.search.NumericRangeQuery;
082 import org.apache.lucene.search.Query;
083 import org.apache.lucene.search.TermQuery;
084 import org.apache.lucene.search.TermRangeQuery;
085 import org.apache.lucene.search.WildcardQuery;
086 import org.apache.lucene.search.highlight.Highlighter;
087 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
088 import org.apache.lucene.search.highlight.QueryScorer;
089 import org.apache.lucene.search.highlight.SimpleFragmenter;
090 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
091 import org.apache.lucene.search.highlight.WeightedTerm;
092 import org.apache.lucene.util.Version;
093
094
102 public class LuceneHelperImpl implements LuceneHelper {
103
104 public void addDocument(long companyId, Document document)
105 throws IOException {
106
107 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
108
109 indexAccessor.addDocument(document);
110 }
111
112 public void addExactTerm(
113 BooleanQuery booleanQuery, String field, String value) {
114
115 addTerm(booleanQuery, field, value, false);
116 }
117
118 public void addNumericRangeTerm(
119 BooleanQuery booleanQuery, String field, String startValue,
120 String endValue) {
121
122 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
123 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
124 true, true);
125
126 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
127 }
128
129 public void addRangeTerm(
130 BooleanQuery booleanQuery, String field, String startValue,
131 String endValue) {
132
133 boolean includesLower = true;
134
135 if (startValue.equals(StringPool.STAR)) {
136 includesLower = false;
137 }
138
139 boolean includesUpper = true;
140
141 if (endValue.equals(StringPool.STAR)) {
142 includesUpper = false;
143 }
144
145 TermRangeQuery termRangeQuery = new TermRangeQuery(
146 field, startValue, endValue, includesLower, includesUpper);
147
148 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
149 }
150
151 public void addRequiredTerm(
152 BooleanQuery booleanQuery, String field, String value, boolean like) {
153
154 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
155 }
156
157 public void addRequiredTerm(
158 BooleanQuery booleanQuery, String field, String[] values,
159 boolean like) {
160
161 if (values == null) {
162 return;
163 }
164
165 BooleanQuery query = new BooleanQuery();
166
167 for (String value : values) {
168 addTerm(query, field, value, like);
169 }
170
171 booleanQuery.add(query, BooleanClause.Occur.MUST);
172 }
173
174 public void addTerm(
175 BooleanQuery booleanQuery, String field, String value, boolean like) {
176
177 if (Validator.isNull(value)) {
178 return;
179 }
180
181 if (like) {
182 value = StringUtil.replace(
183 value, StringPool.PERCENT, StringPool.BLANK);
184 }
185
186 try {
187 QueryParser queryParser = new QueryParser(
188 getVersion(), field, getAnalyzer());
189
190 Query query = queryParser.parse(value);
191
192 _includeIfUnique(
193 booleanQuery, query, BooleanClause.Occur.SHOULD, like);
194 }
195 catch (Exception e) {
196 _log.error(e, e);
197 }
198 }
199
200 public void addTerm(
201 BooleanQuery booleanQuery, String field, String[] values,
202 boolean like) {
203
204 for (String value : values) {
205 addTerm(booleanQuery, field, value, like);
206 }
207 }
208
209 public int countScoredFieldNames(Query query, String[] filedNames) {
210 int count = 0;
211
212 for (String fieldName : filedNames) {
213 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
214 query, false, fieldName);
215
216 if ((weightedTerms.length > 0) &&
217 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
218
219 count++;
220 }
221 }
222
223 return count;
224 }
225
226 public void delete(long companyId) {
227 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
228
229 if (indexAccessor == null) {
230 return;
231 }
232
233 indexAccessor.delete();
234 }
235
236 public void deleteDocuments(long companyId, Term term) throws IOException {
237 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
238
239 if (indexAccessor == null) {
240 return;
241 }
242
243 indexAccessor.deleteDocuments(term);
244 }
245
246 public void dumpIndex(long companyId, OutputStream outputStream)
247 throws IOException {
248
249 long lastGeneration = getLastGeneration(companyId);
250
251 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
252 if (_log.isDebugEnabled()) {
253 _log.debug(
254 "Dump index from cluster is not enabled for " + companyId);
255 }
256
257 return;
258 }
259
260 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
261
262 if (indexAccessor == null) {
263 return;
264 }
265
266 indexAccessor.dumpIndex(outputStream);
267 }
268
269 public Analyzer getAnalyzer() {
270 return _analyzer;
271 }
272
273 public long getLastGeneration(long companyId) {
274 if (!isLoadIndexFromClusterEnabled()) {
275 return IndexAccessor.DEFAULT_LAST_GENERATION;
276 }
277
278 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
279
280 if (indexAccessor == null) {
281 return IndexAccessor.DEFAULT_LAST_GENERATION;
282 }
283
284 return indexAccessor.getLastGeneration();
285 }
286
287 public InputStream getLoadIndexesInputStreamFromCluster(
288 long companyId, Address bootupAddress)
289 throws SystemException {
290
291 if (!isLoadIndexFromClusterEnabled()) {
292 return null;
293 }
294
295 InputStream inputStream = null;
296
297 try {
298 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
299 _getBootupClusterNodeObjectValuePair(bootupAddress);
300
301 URL url = bootupClusterNodeObjectValuePair.getValue();
302
303 URLConnection urlConnection = url.openConnection();
304
305 urlConnection.setDoOutput(true);
306
307 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
308 urlConnection.getOutputStream());
309
310 unsyncPrintWriter.write("transientToken=");
311 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
312 unsyncPrintWriter.write("&companyId=");
313 unsyncPrintWriter.write(String.valueOf(companyId));
314
315 unsyncPrintWriter.close();
316
317 inputStream = urlConnection.getInputStream();
318
319 return inputStream;
320 }
321 catch (IOException ioe) {
322 throw new SystemException(ioe);
323 }
324 }
325
326 public String[] getQueryTerms(Query query) {
327 String queryString = StringUtil.replace(
328 query.toString(), StringPool.STAR, StringPool.BLANK);
329
330 Query tempQuery = null;
331
332 try {
333 QueryParser queryParser = new QueryParser(
334 getVersion(), StringPool.BLANK, getAnalyzer());
335
336 tempQuery = queryParser.parse(queryString);
337 }
338 catch (Exception e) {
339 if (_log.isWarnEnabled()) {
340 _log.warn("Unable to parse " + queryString);
341 }
342
343 tempQuery = query;
344 }
345
346 WeightedTerm[] weightedTerms = null;
347
348 for (String fieldName : Field.KEYWORDS) {
349 weightedTerms = QueryTermExtractor.getTerms(
350 tempQuery, false, fieldName);
351
352 if (weightedTerms.length > 0) {
353 break;
354 }
355 }
356
357 Set<String> queryTerms = new HashSet<String>();
358
359 for (WeightedTerm weightedTerm : weightedTerms) {
360 queryTerms.add(weightedTerm.getTerm());
361 }
362
363 return queryTerms.toArray(new String[queryTerms.size()]);
364 }
365
366 public IndexSearcher getSearcher(long companyId, boolean readOnly)
367 throws IOException {
368
369 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
370
371 IndexSearcher indexSearcher = new IndexSearcher(
372 indexAccessor.getLuceneDir(), readOnly);
373
374 indexSearcher.setDefaultFieldSortScoring(true, true);
375 indexSearcher.setSimilarity(new FieldWeightSimilarity());
376
377 return indexSearcher;
378 }
379
380 public String getSnippet(
381 Query query, String field, String s, int maxNumFragments,
382 int fragmentLength, String fragmentSuffix, String preTag,
383 String postTag)
384 throws IOException {
385
386 SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
387 preTag, postTag);
388
389 QueryScorer queryScorer = new QueryScorer(query, field);
390
391 Highlighter highlighter = new Highlighter(
392 simpleHTMLFormatter, queryScorer);
393
394 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
395
396 TokenStream tokenStream = getAnalyzer().tokenStream(
397 field, new UnsyncStringReader(s));
398
399 try {
400 String snippet = highlighter.getBestFragments(
401 tokenStream, s, maxNumFragments, fragmentSuffix);
402
403 if (Validator.isNotNull(snippet) &&
404 !StringUtil.endsWith(snippet, fragmentSuffix)) {
405
406 snippet = snippet.concat(fragmentSuffix);
407 }
408
409 return snippet;
410 }
411 catch (InvalidTokenOffsetsException itoe) {
412 throw new IOException(itoe.getMessage());
413 }
414 }
415
416 public Version getVersion() {
417 return _version;
418 }
419
420 public boolean isLoadIndexFromClusterEnabled() {
421 if (PropsValues.CLUSTER_LINK_ENABLED &&
422 PropsValues.LUCENE_REPLICATE_WRITE) {
423
424 return true;
425 }
426
427 if (_log.isDebugEnabled()) {
428 _log.debug("Load index from cluster is not enabled");
429 }
430
431 return false;
432 }
433
434 public void loadIndex(long companyId, InputStream inputStream)
435 throws IOException {
436
437 if (!isLoadIndexFromClusterEnabled()) {
438 return;
439 }
440
441 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
442
443 if (indexAccessor == null) {
444 return;
445 }
446
447 indexAccessor.loadIndex(inputStream);
448 }
449
450 public Address selectBootupClusterAddress(
451 long companyId, long localLastGeneration)
452 throws SystemException {
453
454 if (!isLoadIndexFromClusterEnabled()) {
455 return null;
456 }
457
458 List<Address> clusterNodeAddresses =
459 ClusterExecutorUtil.getClusterNodeAddresses();
460
461 int clusterNodeAddressesCount = clusterNodeAddresses.size();
462
463 if (clusterNodeAddressesCount <= 1) {
464 if (_log.isDebugEnabled()) {
465 _log.debug(
466 "Do not load indexes because there is either one portal " +
467 "instance or no portal instances in the cluster");
468 }
469
470 return null;
471 }
472
473 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
474 new MethodHandler(_getLastGenerationMethodKey, companyId),
475 true);
476
477 FutureClusterResponses futureClusterResponses =
478 ClusterExecutorUtil.execute(clusterRequest);
479
480 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
481 futureClusterResponses.getPartialResults();
482
483 Address bootupAddress = null;
484
485 do{
486 clusterNodeAddressesCount--;
487
488 ClusterNodeResponse clusterNodeResponse = null;
489
490 try {
491 clusterNodeResponse = clusterNodeResponses.poll(
492 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
493 java.util.concurrent.TimeUnit.MILLISECONDS);
494 }
495 catch (Exception e) {
496 throw new SystemException(e);
497 }
498
499 if (clusterNodeResponse == null) {
500 if (_log.isDebugEnabled()) {
501 _log.debug(
502 "Unable to get cluster node response in " +
503 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
504 java.util.concurrent.TimeUnit.MILLISECONDS);
505 }
506
507 continue;
508 }
509
510 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
511
512 if (clusterNode.getPort() > 0) {
513 try {
514 long remoteLastGeneration =
515 (Long)clusterNodeResponse.getResult();
516
517 if (remoteLastGeneration > localLastGeneration) {
518 bootupAddress = clusterNodeResponse.getAddress();
519
520 break;
521 }
522 }
523 catch (Exception e) {
524 if (_log.isDebugEnabled()) {
525 _log.debug(
526 "Suppress exception caused by remote method " +
527 "invocation",
528 e);
529 }
530
531 continue;
532 }
533 }
534 else {
535 if (_log.isDebugEnabled()) {
536 _log.debug(
537 "Cluster node " + clusterNode + " has invalid port");
538 }
539 }
540 } while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
541
542 return bootupAddress;
543 }
544
545 public void setAnalyzer(Analyzer analyzer) {
546 _analyzer = analyzer;
547 }
548
549 public void setVersion(Version version) {
550 _version = version;
551 }
552
553 public void shutdown() {
554 if (_luceneIndexThreadPoolExecutor != null) {
555 _luceneIndexThreadPoolExecutor.shutdownNow();
556
557 try {
558 _luceneIndexThreadPoolExecutor.awaitTermination(
559 60, java.util.concurrent.TimeUnit.SECONDS);
560 }
561 catch (InterruptedException ie) {
562 _log.error("Lucene indexer shutdown interrupted", ie);
563 }
564 }
565
566 if (isLoadIndexFromClusterEnabled()) {
567 ClusterExecutorUtil.removeClusterEventListener(
568 _loadIndexClusterEventListener);
569 }
570
571 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
572 indexAccessor.close();
573 }
574 }
575
576 public void startup(long companyId) {
577 if (PropsValues.INDEX_ON_STARTUP) {
578 if (_log.isInfoEnabled()) {
579 _log.info("Indexing Lucene on startup");
580 }
581
582 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
583
584 if (PropsValues.INDEX_WITH_THREAD) {
585 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
586 }
587 else {
588 luceneIndexer.reindex();
589 }
590 }
591
592 if (PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_ENABLED) {
593 SchedulerEntry schedulerEntry = new SchedulerEntryImpl();
594
595 schedulerEntry.setEventListenerClass(
596 CleanUpMessageListener.class.getName());
597 schedulerEntry.setTimeUnit(TimeUnit.MINUTE);
598 schedulerEntry.setTriggerType(TriggerType.SIMPLE);
599 schedulerEntry.setTriggerValue(
600 PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_INTERVAL);
601
602 try {
603 SchedulerEngineUtil.schedule(
604 schedulerEntry, StorageType.MEMORY,
605 PortalClassLoaderUtil.getClassLoader(), 0);
606 }
607 catch (Exception e) {
608 _log.error(e, e);
609 }
610 }
611 }
612
613 public void updateDocument(long companyId, Term term, Document document)
614 throws IOException {
615
616 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
617
618 indexAccessor.updateDocument(term, document);
619 }
620
621 private LuceneHelperImpl() {
622 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
623 _luceneIndexThreadPoolExecutor =
624 PortalExecutorManagerUtil.getPortalExecutor(
625 LuceneHelperImpl.class.getName());
626 }
627
628 if (isLoadIndexFromClusterEnabled()) {
629 _loadIndexClusterEventListener =
630 new LoadIndexClusterEventListener();
631
632 ClusterExecutorUtil.addClusterEventListener(
633 _loadIndexClusterEventListener);
634 }
635 }
636
637 private ObjectValuePair<String, URL>
638 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
639 throws SystemException {
640
641 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
642 new MethodHandler(
643 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
644 bootupAddress);
645
646 FutureClusterResponses futureClusterResponses =
647 ClusterExecutorUtil.execute(clusterRequest);
648
649 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
650 futureClusterResponses.getPartialResults();
651
652 try {
653 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
654 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
655 java.util.concurrent.TimeUnit.MILLISECONDS);
656
657 String transientToken = (String)clusterNodeResponse.getResult();
658
659 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
660
661 InetAddress inetAddress = clusterNode.getInetAddress();
662
663 URL url = new URL(
664 "http", inetAddress.getHostAddress(),
665 clusterNode.getPort(), "/lucene/dump");
666
667 return new ObjectValuePair<String, URL>(transientToken, url);
668 }
669 catch (Exception e) {
670 throw new SystemException(e);
671 }
672 }
673
674 private IndexAccessor _getIndexAccessor(long companyId) {
675 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
676
677 if (indexAccessor != null) {
678 return indexAccessor;
679 }
680
681 synchronized (this) {
682 indexAccessor = _indexAccessors.get(companyId);
683
684 if (indexAccessor == null) {
685 indexAccessor = new IndexAccessorImpl(companyId);
686
687 if (isLoadIndexFromClusterEnabled()) {
688 InputStream inputStream = null;
689
690 try {
691 Address bootupAddress = selectBootupClusterAddress(
692 companyId, IndexAccessor.DEFAULT_LAST_GENERATION);
693
694 if (bootupAddress != null) {
695 inputStream = getLoadIndexesInputStreamFromCluster(
696 companyId, bootupAddress);
697
698 indexAccessor.loadIndex(inputStream);
699 }
700
701 indexAccessor.enableDumpIndex();
702 }
703 catch (Exception e) {
704 _log.error(
705 "Unable to load index for company " +
706 indexAccessor.getCompanyId(),
707 e);
708 }
709 finally {
710 if (inputStream != null) {
711 try {
712 inputStream.close();
713 }
714 catch (IOException ioe) {
715 _log.error(
716 "Unable to close input stream for " +
717 "company " +
718 indexAccessor.getCompanyId(),
719 ioe);
720 }
721 }
722 }
723 }
724
725 _indexAccessors.put(companyId, indexAccessor);
726 }
727 }
728
729 return indexAccessor;
730 }
731
732 private void _includeIfUnique(
733 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
734 boolean like) {
735
736 if (query instanceof TermQuery) {
737 Set<Term> terms = new HashSet<Term>();
738
739 query.extractTerms(terms);
740
741 for (Term term : terms) {
742 String termValue = term.text();
743
744 if (like) {
745 term = term.createTerm(
746 StringPool.STAR.concat(termValue).concat(
747 StringPool.STAR));
748
749 query = new WildcardQuery(term);
750 }
751 else {
752 query = new TermQuery(term);
753 }
754
755 boolean included = false;
756
757 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
758 if (query.equals(booleanClause.getQuery())) {
759 included = true;
760 }
761 }
762
763 if (!included) {
764 booleanQuery.add(query, occur);
765 }
766 }
767 }
768 else if (query instanceof BooleanQuery) {
769 BooleanQuery curBooleanQuery = (BooleanQuery)query;
770
771 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
772 _includeIfUnique(
773 booleanQuery, booleanClause.getQuery(),
774 booleanClause.getOccur(), like);
775 }
776 }
777 else {
778 boolean included = false;
779
780 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
781 if (query.equals(booleanClause.getQuery())) {
782 included = true;
783 }
784 }
785
786 if (!included) {
787 booleanQuery.add(query, occur);
788 }
789 }
790 }
791
792 private class LoadIndexClusterEventListener
793 implements ClusterEventListener {
794
795 public void processClusterEvent(ClusterEvent clusterEvent) {
796 ClusterEventType clusterEventType =
797 clusterEvent.getClusterEventType();
798
799 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
800 return;
801 }
802
803 List<Address> clusterNodeAddresses =
804 ClusterExecutorUtil.getClusterNodeAddresses();
805 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
806
807 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
808 if (_log.isDebugEnabled()) {
809 _log.debug(
810 "Number of original cluster members is greater than " +
811 "one");
812 }
813
814 return;
815 }
816
817 long[] companyIds = PortalInstances.getCompanyIds();
818
819 for (long companyId : companyIds) {
820 loadIndexes(companyId);
821 }
822
823 loadIndexes(CompanyConstants.SYSTEM);
824 }
825
826 private void loadIndexes(long companyId) {
827 long lastGeneration = getLastGeneration(companyId);
828
829 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
830 return;
831 }
832
833 try {
834 LuceneClusterUtil.loadIndexesFromCluster(companyId);
835 }
836 catch (Exception e) {
837 _log.error(
838 "Unable to load indexes for company " + companyId, e);
839 }
840 }
841
842 }
843
844 private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
845
846 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
847
848 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
849
850 private static MethodKey _createTokenMethodKey =
851 new MethodKey(TransientTokenUtil.class.getName(), "createToken",
852 long.class);
853 private static MethodKey _getLastGenerationMethodKey =
854 new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
855 long.class);
856
857 private Analyzer _analyzer;
858 private Map<Long, IndexAccessor> _indexAccessors =
859 new ConcurrentHashMap<Long, IndexAccessor>();
860 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
861 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
862 private Version _version;
863
864 }