001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterNode;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
029 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
030 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEntry;
035 import com.liferay.portal.kernel.scheduler.SchedulerEntryImpl;
036 import com.liferay.portal.kernel.scheduler.StorageType;
037 import com.liferay.portal.kernel.scheduler.TimeUnit;
038 import com.liferay.portal.kernel.scheduler.TriggerType;
039 import com.liferay.portal.kernel.search.BooleanClauseOccur;
040 import com.liferay.portal.kernel.search.Field;
041 import com.liferay.portal.kernel.util.ArrayUtil;
042 import com.liferay.portal.kernel.util.GetterUtil;
043 import com.liferay.portal.kernel.util.MethodHandler;
044 import com.liferay.portal.kernel.util.MethodKey;
045 import com.liferay.portal.kernel.util.ObjectValuePair;
046 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
047 import com.liferay.portal.kernel.util.StringPool;
048 import com.liferay.portal.kernel.util.StringUtil;
049 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
050 import com.liferay.portal.kernel.util.Validator;
051 import com.liferay.portal.model.CompanyConstants;
052 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
053 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
054 import com.liferay.portal.search.lucene.messaging.CleanUpMessageListener;
055 import com.liferay.portal.security.auth.TransientTokenUtil;
056 import com.liferay.portal.util.PortalInstances;
057 import com.liferay.portal.util.PropsValues;
058
059 import java.io.IOException;
060 import java.io.InputStream;
061 import java.io.OutputStream;
062
063 import java.net.InetAddress;
064 import java.net.URL;
065 import java.net.URLConnection;
066
067 import java.util.HashSet;
068 import java.util.List;
069 import java.util.Map;
070 import java.util.Set;
071 import java.util.concurrent.BlockingQueue;
072 import java.util.concurrent.ConcurrentHashMap;
073
074 import org.apache.lucene.analysis.Analyzer;
075 import org.apache.lucene.analysis.TokenStream;
076 import org.apache.lucene.document.Document;
077 import org.apache.lucene.index.Term;
078 import org.apache.lucene.queryParser.QueryParser;
079 import org.apache.lucene.search.BooleanClause;
080 import org.apache.lucene.search.BooleanQuery;
081 import org.apache.lucene.search.IndexSearcher;
082 import org.apache.lucene.search.NumericRangeQuery;
083 import org.apache.lucene.search.Query;
084 import org.apache.lucene.search.TermQuery;
085 import org.apache.lucene.search.TermRangeQuery;
086 import org.apache.lucene.search.WildcardQuery;
087 import org.apache.lucene.search.highlight.Highlighter;
088 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
089 import org.apache.lucene.search.highlight.QueryScorer;
090 import org.apache.lucene.search.highlight.SimpleFragmenter;
091 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
092 import org.apache.lucene.search.highlight.WeightedTerm;
093 import org.apache.lucene.util.Version;
094
095
103 public class LuceneHelperImpl implements LuceneHelper {
104
105 public void addDocument(long companyId, Document document)
106 throws IOException {
107
108 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
109
110 indexAccessor.addDocument(document);
111 }
112
113 public void addExactTerm(
114 BooleanQuery booleanQuery, String field, String value) {
115
116 addTerm(booleanQuery, field, value, false);
117 }
118
119 public void addNumericRangeTerm(
120 BooleanQuery booleanQuery, String field, String startValue,
121 String endValue) {
122
123 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
124 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
125 true, true);
126
127 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
128 }
129
130 public void addRangeTerm(
131 BooleanQuery booleanQuery, String field, String startValue,
132 String endValue) {
133
134 boolean includesLower = true;
135
136 if (startValue.equals(StringPool.STAR)) {
137 includesLower = false;
138 }
139
140 boolean includesUpper = true;
141
142 if (endValue.equals(StringPool.STAR)) {
143 includesUpper = false;
144 }
145
146 TermRangeQuery termRangeQuery = new TermRangeQuery(
147 field, startValue, endValue, includesLower, includesUpper);
148
149 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
150 }
151
152 public void addRequiredTerm(
153 BooleanQuery booleanQuery, String field, String value, boolean like) {
154
155 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
156 }
157
158 public void addRequiredTerm(
159 BooleanQuery booleanQuery, String field, String[] values,
160 boolean like) {
161
162 if (values == null) {
163 return;
164 }
165
166 BooleanQuery query = new BooleanQuery();
167
168 for (String value : values) {
169 addTerm(query, field, value, like);
170 }
171
172 booleanQuery.add(query, BooleanClause.Occur.MUST);
173 }
174
175 public void addTerm(
176 BooleanQuery booleanQuery, String field, String value, boolean like) {
177
178 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
179 }
180
181 public void addTerm(
182 BooleanQuery booleanQuery, String field, String value, boolean like,
183 BooleanClauseOccur booleanClauseOccur) {
184
185 if (Validator.isNull(value)) {
186 return;
187 }
188
189 if (like) {
190 value = StringUtil.replace(
191 value, StringPool.PERCENT, StringPool.BLANK);
192 }
193
194 try {
195 QueryParser queryParser = new QueryParser(
196 getVersion(), field, getAnalyzer());
197
198 Query query = queryParser.parse(value);
199
200 BooleanClause.Occur occur = null;
201
202 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
203 occur = BooleanClause.Occur.MUST;
204 }
205 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
206 occur = BooleanClause.Occur.MUST_NOT;
207 }
208 else {
209 occur = BooleanClause.Occur.SHOULD;
210 }
211
212 _includeIfUnique(booleanQuery, query, occur, like);
213 }
214 catch (Exception e) {
215 _log.error(e, e);
216 }
217 }
218
219 public void addTerm(
220 BooleanQuery booleanQuery, String field, String[] values,
221 boolean like) {
222
223 for (String value : values) {
224 addTerm(booleanQuery, field, value, like);
225 }
226 }
227
228 public int countScoredFieldNames(Query query, String[] filedNames) {
229 int count = 0;
230
231 for (String fieldName : filedNames) {
232 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
233 query, false, fieldName);
234
235 if ((weightedTerms.length > 0) &&
236 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
237
238 count++;
239 }
240 }
241
242 return count;
243 }
244
245 public void delete(long companyId) {
246 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
247
248 if (indexAccessor == null) {
249 return;
250 }
251
252 indexAccessor.delete();
253 }
254
255 public void deleteDocuments(long companyId, Term term) throws IOException {
256 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
257
258 if (indexAccessor == null) {
259 return;
260 }
261
262 indexAccessor.deleteDocuments(term);
263 }
264
265 public void dumpIndex(long companyId, OutputStream outputStream)
266 throws IOException {
267
268 long lastGeneration = getLastGeneration(companyId);
269
270 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
271 if (_log.isDebugEnabled()) {
272 _log.debug(
273 "Dump index from cluster is not enabled for " + companyId);
274 }
275
276 return;
277 }
278
279 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
280
281 if (indexAccessor == null) {
282 return;
283 }
284
285 indexAccessor.dumpIndex(outputStream);
286 }
287
288 public Analyzer getAnalyzer() {
289 return _analyzer;
290 }
291
292 public long getLastGeneration(long companyId) {
293 if (!isLoadIndexFromClusterEnabled()) {
294 return IndexAccessor.DEFAULT_LAST_GENERATION;
295 }
296
297 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
298
299 if (indexAccessor == null) {
300 return IndexAccessor.DEFAULT_LAST_GENERATION;
301 }
302
303 return indexAccessor.getLastGeneration();
304 }
305
306 public InputStream getLoadIndexesInputStreamFromCluster(
307 long companyId, Address bootupAddress)
308 throws SystemException {
309
310 if (!isLoadIndexFromClusterEnabled()) {
311 return null;
312 }
313
314 InputStream inputStream = null;
315
316 try {
317 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
318 _getBootupClusterNodeObjectValuePair(bootupAddress);
319
320 URL url = bootupClusterNodeObjectValuePair.getValue();
321
322 URLConnection urlConnection = url.openConnection();
323
324 urlConnection.setDoOutput(true);
325
326 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
327 urlConnection.getOutputStream());
328
329 unsyncPrintWriter.write("transientToken=");
330 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
331 unsyncPrintWriter.write("&companyId=");
332 unsyncPrintWriter.write(String.valueOf(companyId));
333
334 unsyncPrintWriter.close();
335
336 inputStream = urlConnection.getInputStream();
337
338 return inputStream;
339 }
340 catch (IOException ioe) {
341 throw new SystemException(ioe);
342 }
343 }
344
345 public String[] getQueryTerms(Query query) {
346 String queryString = StringUtil.replace(
347 query.toString(), StringPool.STAR, StringPool.BLANK);
348
349 Query tempQuery = null;
350
351 try {
352 QueryParser queryParser = new QueryParser(
353 getVersion(), StringPool.BLANK, getAnalyzer());
354
355 tempQuery = queryParser.parse(queryString);
356 }
357 catch (Exception e) {
358 if (_log.isWarnEnabled()) {
359 _log.warn("Unable to parse " + queryString);
360 }
361
362 tempQuery = query;
363 }
364
365 WeightedTerm[] weightedTerms = null;
366
367 for (String fieldName : Field.KEYWORDS) {
368 weightedTerms = QueryTermExtractor.getTerms(
369 tempQuery, false, fieldName);
370
371 if (weightedTerms.length > 0) {
372 break;
373 }
374 }
375
376 Set<String> queryTerms = new HashSet<String>();
377
378 for (WeightedTerm weightedTerm : weightedTerms) {
379 queryTerms.add(weightedTerm.getTerm());
380 }
381
382 return queryTerms.toArray(new String[queryTerms.size()]);
383 }
384
385 public IndexSearcher getSearcher(long companyId, boolean readOnly)
386 throws IOException {
387
388 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
389
390 IndexSearcher indexSearcher = new IndexSearcher(
391 indexAccessor.getLuceneDir(), readOnly);
392
393 indexSearcher.setDefaultFieldSortScoring(true, true);
394 indexSearcher.setSimilarity(new FieldWeightSimilarity());
395
396 return indexSearcher;
397 }
398
399 public String getSnippet(
400 Query query, String field, String s, int maxNumFragments,
401 int fragmentLength, String fragmentSuffix, String preTag,
402 String postTag)
403 throws IOException {
404
405 SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
406 preTag, postTag);
407
408 QueryScorer queryScorer = new QueryScorer(query, field);
409
410 Highlighter highlighter = new Highlighter(
411 simpleHTMLFormatter, queryScorer);
412
413 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
414
415 TokenStream tokenStream = getAnalyzer().tokenStream(
416 field, new UnsyncStringReader(s));
417
418 try {
419 String snippet = highlighter.getBestFragments(
420 tokenStream, s, maxNumFragments, fragmentSuffix);
421
422 if (Validator.isNotNull(snippet) &&
423 !StringUtil.endsWith(snippet, fragmentSuffix)) {
424
425 snippet = snippet.concat(fragmentSuffix);
426 }
427
428 return snippet;
429 }
430 catch (InvalidTokenOffsetsException itoe) {
431 throw new IOException(itoe.getMessage());
432 }
433 }
434
435 public Version getVersion() {
436 return _version;
437 }
438
439 public boolean isLoadIndexFromClusterEnabled() {
440 if (PropsValues.CLUSTER_LINK_ENABLED &&
441 PropsValues.LUCENE_REPLICATE_WRITE) {
442
443 return true;
444 }
445
446 if (_log.isDebugEnabled()) {
447 _log.debug("Load index from cluster is not enabled");
448 }
449
450 return false;
451 }
452
453 public void loadIndex(long companyId, InputStream inputStream)
454 throws IOException {
455
456 if (!isLoadIndexFromClusterEnabled()) {
457 return;
458 }
459
460 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
461
462 if (indexAccessor == null) {
463 return;
464 }
465
466 indexAccessor.loadIndex(inputStream);
467 }
468
469 public Address selectBootupClusterAddress(
470 long companyId, long localLastGeneration)
471 throws SystemException {
472
473 if (!isLoadIndexFromClusterEnabled()) {
474 return null;
475 }
476
477 List<Address> clusterNodeAddresses =
478 ClusterExecutorUtil.getClusterNodeAddresses();
479
480 int clusterNodeAddressesCount = clusterNodeAddresses.size();
481
482 if (clusterNodeAddressesCount <= 1) {
483 if (_log.isDebugEnabled()) {
484 _log.debug(
485 "Do not load indexes because there is either one portal " +
486 "instance or no portal instances in the cluster");
487 }
488
489 return null;
490 }
491
492 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
493 new MethodHandler(_getLastGenerationMethodKey, companyId),
494 true);
495
496 FutureClusterResponses futureClusterResponses =
497 ClusterExecutorUtil.execute(clusterRequest);
498
499 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
500 futureClusterResponses.getPartialResults();
501
502 Address bootupAddress = null;
503
504 do {
505 clusterNodeAddressesCount--;
506
507 ClusterNodeResponse clusterNodeResponse = null;
508
509 try {
510 clusterNodeResponse = clusterNodeResponses.poll(
511 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
512 java.util.concurrent.TimeUnit.MILLISECONDS);
513 }
514 catch (Exception e) {
515 throw new SystemException(e);
516 }
517
518 if (clusterNodeResponse == null) {
519 if (_log.isDebugEnabled()) {
520 _log.debug(
521 "Unable to get cluster node response in " +
522 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
523 java.util.concurrent.TimeUnit.MILLISECONDS);
524 }
525
526 continue;
527 }
528
529 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
530
531 if (clusterNode.getPort() > 0) {
532 try {
533 long remoteLastGeneration =
534 (Long)clusterNodeResponse.getResult();
535
536 if (remoteLastGeneration > localLastGeneration) {
537 bootupAddress = clusterNodeResponse.getAddress();
538
539 break;
540 }
541 }
542 catch (Exception e) {
543 if (_log.isDebugEnabled()) {
544 _log.debug(
545 "Suppress exception caused by remote method " +
546 "invocation",
547 e);
548 }
549
550 continue;
551 }
552 }
553 else {
554 if (_log.isDebugEnabled()) {
555 _log.debug(
556 "Cluster node " + clusterNode + " has invalid port");
557 }
558 }
559 } while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
560
561 return bootupAddress;
562 }
563
564 public void setAnalyzer(Analyzer analyzer) {
565 _analyzer = analyzer;
566 }
567
568 public void setVersion(Version version) {
569 _version = version;
570 }
571
572 public void shutdown() {
573 if (_luceneIndexThreadPoolExecutor != null) {
574 _luceneIndexThreadPoolExecutor.shutdownNow();
575
576 try {
577 _luceneIndexThreadPoolExecutor.awaitTermination(
578 60, java.util.concurrent.TimeUnit.SECONDS);
579 }
580 catch (InterruptedException ie) {
581 _log.error("Lucene indexer shutdown interrupted", ie);
582 }
583 }
584
585 if (isLoadIndexFromClusterEnabled()) {
586 ClusterExecutorUtil.removeClusterEventListener(
587 _loadIndexClusterEventListener);
588 }
589
590 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
591 indexAccessor.close();
592 }
593 }
594
595 public void startup(long companyId) {
596 if (PropsValues.INDEX_ON_STARTUP) {
597 if (_log.isInfoEnabled()) {
598 _log.info("Indexing Lucene on startup");
599 }
600
601 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
602
603 if (PropsValues.INDEX_WITH_THREAD) {
604 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
605 }
606 else {
607 luceneIndexer.reindex();
608 }
609 }
610
611 if (PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_ENABLED) {
612 SchedulerEntry schedulerEntry = new SchedulerEntryImpl();
613
614 schedulerEntry.setEventListenerClass(
615 CleanUpMessageListener.class.getName());
616 schedulerEntry.setTimeUnit(TimeUnit.MINUTE);
617 schedulerEntry.setTriggerType(TriggerType.SIMPLE);
618 schedulerEntry.setTriggerValue(
619 PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_INTERVAL);
620
621 try {
622 SchedulerEngineUtil.schedule(
623 schedulerEntry, StorageType.MEMORY,
624 PortalClassLoaderUtil.getClassLoader(), 0);
625 }
626 catch (Exception e) {
627 _log.error(e, e);
628 }
629 }
630 }
631
632 public void updateDocument(long companyId, Term term, Document document)
633 throws IOException {
634
635 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
636
637 indexAccessor.updateDocument(term, document);
638 }
639
640 private LuceneHelperImpl() {
641 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
642 _luceneIndexThreadPoolExecutor =
643 PortalExecutorManagerUtil.getPortalExecutor(
644 LuceneHelperImpl.class.getName());
645 }
646
647 if (isLoadIndexFromClusterEnabled()) {
648 _loadIndexClusterEventListener =
649 new LoadIndexClusterEventListener();
650
651 ClusterExecutorUtil.addClusterEventListener(
652 _loadIndexClusterEventListener);
653 }
654 }
655
656 private ObjectValuePair<String, URL>
657 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
658 throws SystemException {
659
660 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
661 new MethodHandler(
662 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
663 bootupAddress);
664
665 FutureClusterResponses futureClusterResponses =
666 ClusterExecutorUtil.execute(clusterRequest);
667
668 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
669 futureClusterResponses.getPartialResults();
670
671 try {
672 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
673 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
674 java.util.concurrent.TimeUnit.MILLISECONDS);
675
676 String transientToken = (String)clusterNodeResponse.getResult();
677
678 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
679
680 InetAddress inetAddress = clusterNode.getInetAddress();
681
682 URL url = new URL(
683 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
684 "/lucene/dump");
685
686 return new ObjectValuePair<String, URL>(transientToken, url);
687 }
688 catch (Exception e) {
689 throw new SystemException(e);
690 }
691 }
692
693 private IndexAccessor _getIndexAccessor(long companyId) {
694 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
695
696 if (indexAccessor != null) {
697 return indexAccessor;
698 }
699
700 synchronized (this) {
701 indexAccessor = _indexAccessors.get(companyId);
702
703 if (indexAccessor == null) {
704 indexAccessor = new IndexAccessorImpl(companyId);
705
706 if (isLoadIndexFromClusterEnabled()) {
707 InputStream inputStream = null;
708
709 try {
710 Address bootupAddress = selectBootupClusterAddress(
711 companyId, IndexAccessor.DEFAULT_LAST_GENERATION);
712
713 if (bootupAddress != null) {
714 inputStream = getLoadIndexesInputStreamFromCluster(
715 companyId, bootupAddress);
716
717 indexAccessor.loadIndex(inputStream);
718 }
719
720 indexAccessor.enableDumpIndex();
721 }
722 catch (Exception e) {
723 _log.error(
724 "Unable to load index for company " +
725 indexAccessor.getCompanyId(),
726 e);
727 }
728 finally {
729 if (inputStream != null) {
730 try {
731 inputStream.close();
732 }
733 catch (IOException ioe) {
734 _log.error(
735 "Unable to close input stream for " +
736 "company " +
737 indexAccessor.getCompanyId(),
738 ioe);
739 }
740 }
741 }
742 }
743
744 _indexAccessors.put(companyId, indexAccessor);
745 }
746 }
747
748 return indexAccessor;
749 }
750
751 private void _includeIfUnique(
752 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
753 boolean like) {
754
755 if (query instanceof TermQuery) {
756 Set<Term> terms = new HashSet<Term>();
757
758 query.extractTerms(terms);
759
760 for (Term term : terms) {
761 String termValue = term.text();
762
763 if (like) {
764 term = term.createTerm(
765 StringPool.STAR.concat(termValue).concat(
766 StringPool.STAR));
767
768 query = new WildcardQuery(term);
769 }
770 else {
771 query = new TermQuery(term);
772 }
773
774 boolean included = false;
775
776 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
777 if (query.equals(booleanClause.getQuery())) {
778 included = true;
779 }
780 }
781
782 if (!included) {
783 booleanQuery.add(query, occur);
784 }
785 }
786 }
787 else if (query instanceof BooleanQuery) {
788 BooleanQuery curBooleanQuery = (BooleanQuery)query;
789
790 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
791 _includeIfUnique(
792 booleanQuery, booleanClause.getQuery(),
793 booleanClause.getOccur(), like);
794 }
795 }
796 else {
797 boolean included = false;
798
799 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
800 if (query.equals(booleanClause.getQuery())) {
801 included = true;
802 }
803 }
804
805 if (!included) {
806 booleanQuery.add(query, occur);
807 }
808 }
809 }
810
811 private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
812
813 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
814
815 private static MethodKey _createTokenMethodKey =
816 new MethodKey(TransientTokenUtil.class.getName(), "createToken",
817 long.class);
818 private static MethodKey _getLastGenerationMethodKey =
819 new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
820 long.class);
821
822 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
823
824 private Analyzer _analyzer;
825 private Map<Long, IndexAccessor> _indexAccessors =
826 new ConcurrentHashMap<Long, IndexAccessor>();
827 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
828 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
829 private Version _version;
830
831 private class LoadIndexClusterEventListener
832 implements ClusterEventListener {
833
834 public void processClusterEvent(ClusterEvent clusterEvent) {
835 ClusterEventType clusterEventType =
836 clusterEvent.getClusterEventType();
837
838 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
839 return;
840 }
841
842 List<Address> clusterNodeAddresses =
843 ClusterExecutorUtil.getClusterNodeAddresses();
844 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
845
846 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
847 if (_log.isDebugEnabled()) {
848 _log.debug(
849 "Number of original cluster members is greater than " +
850 "one");
851 }
852
853 return;
854 }
855
856 long[] companyIds = PortalInstances.getCompanyIds();
857
858 for (long companyId : companyIds) {
859 loadIndexes(companyId);
860 }
861
862 loadIndexes(CompanyConstants.SYSTEM);
863 }
864
865 private void loadIndexes(long companyId) {
866 long lastGeneration = getLastGeneration(companyId);
867
868 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
869 return;
870 }
871
872 try {
873 LuceneClusterUtil.loadIndexesFromCluster(companyId);
874 }
875 catch (Exception e) {
876 _log.error(
877 "Unable to load indexes for company " + companyId, e);
878 }
879 }
880
881 }
882
883 }