001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLink;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036    import com.liferay.portal.kernel.search.BooleanClauseOccur;
037    import com.liferay.portal.kernel.search.Field;
038    import com.liferay.portal.kernel.util.ArrayUtil;
039    import com.liferay.portal.kernel.util.GetterUtil;
040    import com.liferay.portal.kernel.util.MethodHandler;
041    import com.liferay.portal.kernel.util.MethodKey;
042    import com.liferay.portal.kernel.util.ObjectValuePair;
043    import com.liferay.portal.kernel.util.PropsKeys;
044    import com.liferay.portal.kernel.util.StringPool;
045    import com.liferay.portal.kernel.util.StringUtil;
046    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
047    import com.liferay.portal.kernel.util.Validator;
048    import com.liferay.portal.model.CompanyConstants;
049    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
050    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
051    import com.liferay.portal.security.auth.TransientTokenUtil;
052    import com.liferay.portal.util.PortalInstances;
053    import com.liferay.portal.util.PortalUtil;
054    import com.liferay.portal.util.PropsUtil;
055    import com.liferay.portal.util.PropsValues;
056    import com.liferay.util.lucene.KeywordsUtil;
057    
058    import java.io.IOException;
059    import java.io.InputStream;
060    import java.io.OutputStream;
061    
062    import java.net.InetAddress;
063    import java.net.URL;
064    import java.net.URLConnection;
065    
066    import java.util.HashSet;
067    import java.util.List;
068    import java.util.Map;
069    import java.util.Set;
070    import java.util.concurrent.BlockingQueue;
071    import java.util.concurrent.ConcurrentHashMap;
072    import java.util.concurrent.TimeUnit;
073    import java.util.concurrent.TimeoutException;
074    
075    import org.apache.commons.lang.time.StopWatch;
076    import org.apache.lucene.analysis.Analyzer;
077    import org.apache.lucene.analysis.TokenStream;
078    import org.apache.lucene.document.Document;
079    import org.apache.lucene.index.IndexReader;
080    import org.apache.lucene.index.Term;
081    import org.apache.lucene.queryParser.QueryParser;
082    import org.apache.lucene.search.BooleanClause;
083    import org.apache.lucene.search.BooleanQuery;
084    import org.apache.lucene.search.IndexSearcher;
085    import org.apache.lucene.search.NumericRangeQuery;
086    import org.apache.lucene.search.Query;
087    import org.apache.lucene.search.TermQuery;
088    import org.apache.lucene.search.TermRangeQuery;
089    import org.apache.lucene.search.WildcardQuery;
090    import org.apache.lucene.search.highlight.Formatter;
091    import org.apache.lucene.search.highlight.Highlighter;
092    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
093    import org.apache.lucene.search.highlight.QueryScorer;
094    import org.apache.lucene.search.highlight.SimpleFragmenter;
095    import org.apache.lucene.search.highlight.WeightedTerm;
096    import org.apache.lucene.util.Version;
097    
098    /**
099     * @author Brian Wing Shun Chan
100     * @author Harry Mark
101     * @author Bruno Farache
102     * @author Shuyang Zhou
103     * @author Tina Tian
104     * @author Hugo Huijser
105     */
106    public class LuceneHelperImpl implements LuceneHelper {
107    
108            @Override
109            public void addDocument(long companyId, Document document)
110                    throws IOException {
111    
112                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
113    
114                    indexAccessor.addDocument(document);
115            }
116    
117            @Override
118            public void addExactTerm(
119                    BooleanQuery booleanQuery, String field, String value) {
120    
121                    addTerm(booleanQuery, field, value, false);
122            }
123    
124            @Override
125            public void addNumericRangeTerm(
126                    BooleanQuery booleanQuery, String field, String startValue,
127                    String endValue) {
128    
129                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
130                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
131                            true, true);
132    
133                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
134            }
135    
136            @Override
137            public void addRangeTerm(
138                    BooleanQuery booleanQuery, String field, String startValue,
139                    String endValue) {
140    
141                    boolean includesLower = true;
142    
143                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
144                            includesLower = false;
145                    }
146    
147                    boolean includesUpper = true;
148    
149                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
150                            includesUpper = false;
151                    }
152    
153                    TermRangeQuery termRangeQuery = new TermRangeQuery(
154                            field, startValue, endValue, includesLower, includesUpper);
155    
156                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
157            }
158    
159            @Override
160            public void addRequiredTerm(
161                    BooleanQuery booleanQuery, String field, String value, boolean like) {
162    
163                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
164            }
165    
166            @Override
167            public void addRequiredTerm(
168                    BooleanQuery booleanQuery, String field, String[] values,
169                    boolean like) {
170    
171                    if (values == null) {
172                            return;
173                    }
174    
175                    BooleanQuery query = new BooleanQuery();
176    
177                    for (String value : values) {
178                            addTerm(query, field, value, like);
179                    }
180    
181                    booleanQuery.add(query, BooleanClause.Occur.MUST);
182            }
183    
184            @Override
185            public void addTerm(
186                    BooleanQuery booleanQuery, String field, String value, boolean like) {
187    
188                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
189            }
190    
191            @Override
192            public void addTerm(
193                    BooleanQuery booleanQuery, String field, String value, boolean like,
194                    BooleanClauseOccur booleanClauseOccur) {
195    
196                    if (Validator.isNull(value)) {
197                            return;
198                    }
199    
200                    Analyzer analyzer = getAnalyzer();
201    
202                    if (analyzer instanceof PerFieldAnalyzer) {
203                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
204    
205                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
206    
207                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
208                                    like = true;
209                            }
210                    }
211    
212                    if (like) {
213                            value = StringUtil.replace(
214                                    value, StringPool.PERCENT, StringPool.BLANK);
215                    }
216    
217                    try {
218                            QueryParser queryParser = new QueryParser(
219                                    getVersion(), field, analyzer);
220    
221                            Query query = queryParser.parse(value);
222    
223                            try {
224                                    if (like && (query instanceof TermQuery)) {
225    
226                                            // LUCENE-89
227    
228                                            TermQuery termQuery = (TermQuery)query;
229    
230                                            Term term = termQuery.getTerm();
231    
232                                            value = term.text();
233                                            value = value.toLowerCase(queryParser.getLocale());
234    
235                                            query = new TermQuery(term.createTerm(value));
236    
237                                            query.setBoost(termQuery.getBoost());
238                                    }
239                            }
240                            catch (Exception e) {
241                                    query = queryParser.parse(KeywordsUtil.escape(value));
242                            }
243    
244                            BooleanClause.Occur occur = null;
245    
246                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
247                                    occur = BooleanClause.Occur.MUST;
248                            }
249                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
250                                    occur = BooleanClause.Occur.MUST_NOT;
251                            }
252                            else {
253                                    occur = BooleanClause.Occur.SHOULD;
254                            }
255    
256                            _includeIfUnique(booleanQuery, query, occur, like);
257                    }
258                    catch (Exception e) {
259                            if (_log.isWarnEnabled()) {
260                                    _log.warn(e, e);
261                            }
262                    }
263            }
264    
265            @Override
266            public void addTerm(
267                    BooleanQuery booleanQuery, String field, String[] values,
268                    boolean like) {
269    
270                    for (String value : values) {
271                            addTerm(booleanQuery, field, value, like);
272                    }
273            }
274    
275            @Override
276            public void cleanUp(IndexSearcher indexSearcher) {
277                    if (indexSearcher == null) {
278                            return;
279                    }
280    
281                    try {
282                            indexSearcher.close();
283    
284                            IndexReader indexReader = indexSearcher.getIndexReader();
285    
286                            if (indexReader != null) {
287                                    indexReader.close();
288                            }
289                    }
290                    catch (IOException ioe) {
291                            _log.error(ioe, ioe);
292                    }
293            }
294    
295            @Override
296            public int countScoredFieldNames(Query query, String[] filedNames) {
297                    int count = 0;
298    
299                    for (String fieldName : filedNames) {
300                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
301                                    query, false, fieldName);
302    
303                            if ((weightedTerms.length > 0) &&
304                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
305    
306                                    count++;
307                            }
308                    }
309    
310                    return count;
311            }
312    
313            @Override
314            public void delete(long companyId) {
315                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
316    
317                    if (indexAccessor == null) {
318                            return;
319                    }
320    
321                    indexAccessor.delete();
322            }
323    
324            @Override
325            public void deleteDocuments(long companyId, Term term) throws IOException {
326                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
327    
328                    if (indexAccessor == null) {
329                            return;
330                    }
331    
332                    indexAccessor.deleteDocuments(term);
333            }
334    
335            @Override
336            public void dumpIndex(long companyId, OutputStream outputStream)
337                    throws IOException {
338    
339                    long lastGeneration = getLastGeneration(companyId);
340    
341                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
342                            if (_log.isDebugEnabled()) {
343                                    _log.debug(
344                                            "Dump index from cluster is not enabled for " + companyId);
345                            }
346    
347                            return;
348                    }
349    
350                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
351    
352                    if (indexAccessor == null) {
353                            return;
354                    }
355    
356                    indexAccessor.dumpIndex(outputStream);
357            }
358    
359            @Override
360            public Analyzer getAnalyzer() {
361                    return _analyzer;
362            }
363    
364            @Override
365            public IndexAccessor getIndexAccessor(long companyId) {
366                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
367    
368                    if (indexAccessor != null) {
369                            return indexAccessor;
370                    }
371    
372                    synchronized (this) {
373                            indexAccessor = _indexAccessors.get(companyId);
374    
375                            if (indexAccessor == null) {
376                                    indexAccessor = new IndexAccessorImpl(companyId);
377    
378                                    if (isLoadIndexFromClusterEnabled()) {
379                                            indexAccessor = new SynchronizedIndexAccessorImpl(
380                                                    indexAccessor);
381    
382                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
383                                                    MessageValuesThreadLocal.getValue(
384                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
385    
386                                            if (clusterForwardMessage) {
387                                                    if (_log.isInfoEnabled()) {
388                                                            _log.info(
389                                                                    "Skip Luncene index files cluster loading " +
390                                                                            "since this is a manual reindex request");
391                                                    }
392                                            }
393                                            else {
394                                                    try {
395                                                            _loadIndexFromCluster(
396                                                                    indexAccessor,
397                                                                    indexAccessor.getLastGeneration());
398                                                    }
399                                                    catch (Exception e) {
400                                                            _log.error(
401                                                                    "Unable to load index for company " +
402                                                                            indexAccessor.getCompanyId(),
403                                                                    e);
404                                                    }
405                                            }
406                                    }
407    
408                                    _indexAccessors.put(companyId, indexAccessor);
409                            }
410                    }
411    
412                    return indexAccessor;
413            }
414    
415            @Override
416            public long getLastGeneration(long companyId) {
417                    if (!isLoadIndexFromClusterEnabled()) {
418                            return IndexAccessor.DEFAULT_LAST_GENERATION;
419                    }
420    
421                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
422    
423                    if (indexAccessor == null) {
424                            return IndexAccessor.DEFAULT_LAST_GENERATION;
425                    }
426    
427                    return indexAccessor.getLastGeneration();
428            }
429    
430            @Override
431            public InputStream getLoadIndexesInputStreamFromCluster(
432                            long companyId, Address bootupAddress)
433                    throws SystemException {
434    
435                    if (!isLoadIndexFromClusterEnabled()) {
436                            return null;
437                    }
438    
439                    InputStream inputStream = null;
440    
441                    try {
442                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
443                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
444    
445                            URL url = bootupClusterNodeObjectValuePair.getValue();
446    
447                            URLConnection urlConnection = url.openConnection();
448    
449                            urlConnection.setDoOutput(true);
450    
451                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
452                                    urlConnection.getOutputStream());
453    
454                            unsyncPrintWriter.write("transientToken=");
455                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
456                            unsyncPrintWriter.write("&companyId=");
457                            unsyncPrintWriter.write(String.valueOf(companyId));
458    
459                            unsyncPrintWriter.close();
460    
461                            inputStream = urlConnection.getInputStream();
462    
463                            return inputStream;
464                    }
465                    catch (IOException ioe) {
466                            throw new SystemException(ioe);
467                    }
468            }
469    
470            @Override
471            public Set<String> getQueryTerms(Query query) {
472                    String queryString = StringUtil.replace(
473                            query.toString(), StringPool.STAR, StringPool.BLANK);
474    
475                    Query tempQuery = null;
476    
477                    try {
478                            QueryParser queryParser = new QueryParser(
479                                    getVersion(), StringPool.BLANK, getAnalyzer());
480    
481                            tempQuery = queryParser.parse(queryString);
482                    }
483                    catch (Exception e) {
484                            if (_log.isWarnEnabled()) {
485                                    _log.warn("Unable to parse " + queryString);
486                            }
487    
488                            tempQuery = query;
489                    }
490    
491                    WeightedTerm[] weightedTerms = null;
492    
493                    for (String fieldName : Field.KEYWORDS) {
494                            weightedTerms = QueryTermExtractor.getTerms(
495                                    tempQuery, false, fieldName);
496    
497                            if (weightedTerms.length > 0) {
498                                    break;
499                            }
500                    }
501    
502                    Set<String> queryTerms = new HashSet<String>();
503    
504                    for (WeightedTerm weightedTerm : weightedTerms) {
505                            queryTerms.add(weightedTerm.getTerm());
506                    }
507    
508                    return queryTerms;
509            }
510    
511            @Override
512            public IndexSearcher getSearcher(long companyId, boolean readOnly)
513                    throws IOException {
514    
515                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
516    
517                    IndexReader indexReader = IndexReader.open(
518                            indexAccessor.getLuceneDir(), readOnly);
519    
520                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
521    
522                    indexSearcher.setDefaultFieldSortScoring(true, true);
523                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
524    
525                    return indexSearcher;
526            }
527    
528            @Override
529            public String getSnippet(
530                            Query query, String field, String s, int maxNumFragments,
531                            int fragmentLength, String fragmentSuffix, Formatter formatter)
532                    throws IOException {
533    
534                    QueryScorer queryScorer = new QueryScorer(query, field);
535    
536                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
537    
538                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
539    
540                    TokenStream tokenStream = getAnalyzer().tokenStream(
541                            field, new UnsyncStringReader(s));
542    
543                    try {
544                            String snippet = highlighter.getBestFragments(
545                                    tokenStream, s, maxNumFragments, fragmentSuffix);
546    
547                            if (Validator.isNotNull(snippet) &&
548                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
549                                    !s.equals(snippet)) {
550    
551                                    snippet = snippet.concat(fragmentSuffix);
552                            }
553    
554                            return snippet;
555                    }
556                    catch (InvalidTokenOffsetsException itoe) {
557                            throw new IOException(itoe.getMessage());
558                    }
559            }
560    
561            @Override
562            public Version getVersion() {
563                    return _version;
564            }
565    
566            @Override
567            public boolean isLoadIndexFromClusterEnabled() {
568                    if (PropsValues.CLUSTER_LINK_ENABLED &&
569                            PropsValues.LUCENE_REPLICATE_WRITE) {
570    
571                            return true;
572                    }
573    
574                    if (_log.isDebugEnabled()) {
575                            _log.debug("Load index from cluster is not enabled");
576                    }
577    
578                    return false;
579            }
580    
581            @Override
582            public void loadIndex(long companyId, InputStream inputStream)
583                    throws IOException {
584    
585                    if (!isLoadIndexFromClusterEnabled()) {
586                            return;
587                    }
588    
589                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
590    
591                    if (indexAccessor == null) {
592                            if (_log.isInfoEnabled()) {
593                                    _log.info(
594                                            "Skip loading Lucene index files for company " + companyId +
595                                                    " in favor of lazy loading");
596                            }
597    
598                            return;
599                    }
600    
601                    StopWatch stopWatch = null;
602    
603                    if (_log.isInfoEnabled()) {
604                            _log.info(
605                                    "Start loading Lucene index files for company " + companyId);
606    
607                            stopWatch = new StopWatch();
608    
609                            stopWatch.start();
610                    }
611    
612                    indexAccessor.loadIndex(inputStream);
613    
614                    if (_log.isInfoEnabled()) {
615                            _log.info(
616                                    "Finished loading index files for company " + companyId +
617                                            " in " + stopWatch.getTime() + " ms");
618                    }
619            }
620    
621            @Override
622            public void loadIndexesFromCluster(long companyId) throws SystemException {
623                    if (!isLoadIndexFromClusterEnabled()) {
624                            return;
625                    }
626    
627                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
628    
629                    if (indexAccessor == null) {
630                            return;
631                    }
632    
633                    long localLastGeneration = getLastGeneration(companyId);
634    
635                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
636            }
637    
638            public void setAnalyzer(Analyzer analyzer) {
639                    _analyzer = analyzer;
640            }
641    
642            public void setVersion(Version version) {
643                    _version = version;
644            }
645    
646            @Override
647            public void shutdown() {
648                    if (_luceneIndexThreadPoolExecutor != null) {
649                            _luceneIndexThreadPoolExecutor.shutdownNow();
650    
651                            try {
652                                    _luceneIndexThreadPoolExecutor.awaitTermination(
653                                            60, TimeUnit.SECONDS);
654                            }
655                            catch (InterruptedException ie) {
656                                    _log.error("Lucene indexer shutdown interrupted", ie);
657                            }
658                    }
659    
660                    if (isLoadIndexFromClusterEnabled()) {
661                            ClusterExecutorUtil.removeClusterEventListener(
662                                    _loadIndexClusterEventListener);
663                    }
664    
665                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
666                            indexAccessor.close();
667                    }
668            }
669    
670            @Override
671            public void shutdown(long companyId) {
672                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
673    
674                    _indexAccessors.remove(indexAccessor);
675    
676                    indexAccessor.close();
677            }
678    
679            @Override
680            public void startup(long companyId) {
681                    if (!PropsValues.INDEX_ON_STARTUP) {
682                            return;
683                    }
684    
685                    if (_log.isInfoEnabled()) {
686                            _log.info("Indexing Lucene on startup");
687                    }
688    
689                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
690    
691                    if (PropsValues.INDEX_WITH_THREAD) {
692                            if (_luceneIndexThreadPoolExecutor == null) {
693    
694                                    // This should never be null except for the case where
695                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
696                                    // PropsValues#INDEX_ON_STARTUP to true.
697    
698                                    _luceneIndexThreadPoolExecutor =
699                                            PortalExecutorManagerUtil.getPortalExecutor(
700                                                    LuceneHelperImpl.class.getName());
701                            }
702    
703                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
704                    }
705                    else {
706                            luceneIndexer.reindex();
707                    }
708            }
709    
710            @Override
711            public void updateDocument(long companyId, Term term, Document document)
712                    throws IOException {
713    
714                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
715    
716                    indexAccessor.updateDocument(term, document);
717            }
718    
719            private LuceneHelperImpl() {
720                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
721                            _luceneIndexThreadPoolExecutor =
722                                    PortalExecutorManagerUtil.getPortalExecutor(
723                                            LuceneHelperImpl.class.getName());
724                    }
725    
726                    if (isLoadIndexFromClusterEnabled()) {
727                            _loadIndexClusterEventListener =
728                                    new LoadIndexClusterEventListener();
729    
730                            ClusterExecutorUtil.addClusterEventListener(
731                                    _loadIndexClusterEventListener);
732                    }
733    
734                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
735            }
736    
737            private ObjectValuePair<String, URL>
738                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
739                    throws SystemException {
740    
741                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
742                            new MethodHandler(
743                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
744                            bootupAddress);
745    
746                    FutureClusterResponses futureClusterResponses =
747                            ClusterExecutorUtil.execute(clusterRequest);
748    
749                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
750                            futureClusterResponses.getPartialResults();
751    
752                    try {
753                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
754                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
755                                    TimeUnit.MILLISECONDS);
756    
757                            String transientToken = (String)clusterNodeResponse.getResult();
758    
759                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
760    
761                            InetAddress inetAddress = clusterNode.getInetAddress();
762    
763                            String fileName = PortalUtil.getPathContext();
764    
765                            if (!fileName.endsWith(StringPool.SLASH)) {
766                                    fileName = fileName.concat(StringPool.SLASH);
767                            }
768    
769                            fileName = fileName.concat("lucene/dump");
770    
771                            URL url = new URL(
772                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
773                                    fileName);
774    
775                            return new ObjectValuePair<String, URL>(transientToken, url);
776                    }
777                    catch (Exception e) {
778                            throw new SystemException(e);
779                    }
780            }
781    
782            private void _includeIfUnique(
783                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
784                    boolean like) {
785    
786                    if (query instanceof TermQuery) {
787                            Set<Term> terms = new HashSet<Term>();
788    
789                            TermQuery termQuery = (TermQuery)query;
790    
791                            termQuery.extractTerms(terms);
792    
793                            float boost = termQuery.getBoost();
794    
795                            for (Term term : terms) {
796                                    String termValue = term.text();
797    
798                                    if (like) {
799                                            term = term.createTerm(
800                                                    StringPool.STAR.concat(termValue).concat(
801                                                            StringPool.STAR));
802    
803                                            query = new WildcardQuery(term);
804                                    }
805                                    else {
806                                            query = new TermQuery(term);
807                                    }
808    
809                                    query.setBoost(boost);
810    
811                                    boolean included = false;
812    
813                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
814                                            if (query.equals(booleanClause.getQuery())) {
815                                                    included = true;
816                                            }
817                                    }
818    
819                                    if (!included) {
820                                            booleanQuery.add(query, occur);
821                                    }
822                            }
823                    }
824                    else if (query instanceof BooleanQuery) {
825                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
826    
827                            BooleanQuery containerBooleanQuery = new BooleanQuery();
828    
829                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
830                                    _includeIfUnique(
831                                            containerBooleanQuery, booleanClause.getQuery(),
832                                            booleanClause.getOccur(), like);
833                            }
834    
835                            if (containerBooleanQuery.getClauses().length > 0) {
836                                    booleanQuery.add(containerBooleanQuery, occur);
837                            }
838                    }
839                    else {
840                            boolean included = false;
841    
842                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
843                                    if (query.equals(booleanClause.getQuery())) {
844                                            included = true;
845                                    }
846                            }
847    
848                            if (!included) {
849                                    booleanQuery.add(query, occur);
850                            }
851                    }
852            }
853    
854            private void _loadIndexFromCluster(
855                            IndexAccessor indexAccessor, long localLastGeneration)
856                    throws SystemException {
857    
858                    List<Address> clusterNodeAddresses =
859                            ClusterExecutorUtil.getClusterNodeAddresses();
860    
861                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
862    
863                    if (clusterNodeAddressesCount <= 1) {
864                            if (_log.isDebugEnabled()) {
865                                    _log.debug(
866                                            "Do not load indexes because there is either one portal " +
867                                                    "instance or no portal instances in the cluster");
868                            }
869    
870                            return;
871                    }
872    
873                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
874                            new MethodHandler(
875                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
876                            true);
877    
878                    ClusterExecutorUtil.execute(
879                            clusterRequest,
880                            new LoadIndexClusterResponseCallback(
881                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
882            }
883    
884            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
885                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
886    
887            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
888                    GetterUtil.getInteger(
889                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
890                            BooleanQuery.getMaxClauseCount());
891    
892            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
893    
894            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
895    
896            private static MethodKey _createTokenMethodKey = new MethodKey(
897                    TransientTokenUtil.class, "createToken", long.class);
898            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
899                    LuceneHelperUtil.class, "getLastGeneration", long.class);
900    
901            private Analyzer _analyzer;
902            private Map<Long, IndexAccessor> _indexAccessors =
903                    new ConcurrentHashMap<Long, IndexAccessor>();
904            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
905            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
906            private Version _version;
907    
908            private class LoadIndexClusterEventListener
909                    implements ClusterEventListener {
910    
911                    @Override
912                    public void processClusterEvent(ClusterEvent clusterEvent) {
913                            ClusterEventType clusterEventType =
914                                    clusterEvent.getClusterEventType();
915    
916                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
917                                    return;
918                            }
919    
920                            List<Address> clusterNodeAddresses =
921                                    ClusterExecutorUtil.getClusterNodeAddresses();
922                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
923    
924                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
925                                    if (_log.isDebugEnabled()) {
926                                            _log.debug(
927                                                    "Number of original cluster members is greater than " +
928                                                            "one");
929                                    }
930    
931                                    return;
932                            }
933    
934                            long[] companyIds = PortalInstances.getCompanyIds();
935    
936                            for (long companyId : companyIds) {
937                                    loadIndexes(companyId);
938                            }
939    
940                            loadIndexes(CompanyConstants.SYSTEM);
941                    }
942    
943                    private void loadIndexes(long companyId) {
944                            long lastGeneration = getLastGeneration(companyId);
945    
946                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
947                                    return;
948                            }
949    
950                            try {
951                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
952                            }
953                            catch (Exception e) {
954                                    _log.error(
955                                            "Unable to load indexes for company " + companyId, e);
956                            }
957                    }
958    
959            }
960    
961            private class LoadIndexClusterResponseCallback
962                    extends BaseClusterResponseCallback {
963    
964                    public LoadIndexClusterResponseCallback(
965                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
966                            long localLastGeneration) {
967    
968                            _indexAccessor = indexAccessor;
969                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
970                            _localLastGeneration = localLastGeneration;
971    
972                            _companyId = _indexAccessor.getCompanyId();
973                    }
974    
975                    @Override
976                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
977                            Address bootupAddress = null;
978    
979                            do {
980                                    _clusterNodeAddressesCount--;
981    
982                                    ClusterNodeResponse clusterNodeResponse = null;
983    
984                                    try {
985                                            clusterNodeResponse = blockingQueue.poll(
986                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
987                                                    TimeUnit.MILLISECONDS);
988                                    }
989                                    catch (Exception e) {
990                                            _log.error("Unable to get cluster node response", e);
991                                    }
992    
993                                    if (clusterNodeResponse == null) {
994                                            if (_log.isDebugEnabled()) {
995                                                    _log.debug(
996                                                            "Unable to get cluster node response in " +
997                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
998                                                                            TimeUnit.MILLISECONDS);
999                                            }
1000    
1001                                            continue;
1002                                    }
1003    
1004                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1005    
1006                                    if (clusterNode.getPort() > 0) {
1007                                            try {
1008                                                    long remoteLastGeneration =
1009                                                            (Long)clusterNodeResponse.getResult();
1010    
1011                                                    if (remoteLastGeneration > _localLastGeneration) {
1012                                                            bootupAddress = clusterNodeResponse.getAddress();
1013    
1014                                                            break;
1015                                                    }
1016                                            }
1017                                            catch (Exception e) {
1018                                                    if (_log.isDebugEnabled()) {
1019                                                            _log.debug(
1020                                                                    "Suppress exception caused by remote method " +
1021                                                                            "invocation",
1022                                                                    e);
1023                                                    }
1024    
1025                                                    continue;
1026                                            }
1027                                    }
1028                                    else {
1029                                            if (_log.isDebugEnabled()) {
1030                                                    _log.debug(
1031                                                            "Cluster node " + clusterNode +
1032                                                                    " has invalid port");
1033                                            }
1034                                    }
1035                            }
1036                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1037    
1038                            if (bootupAddress == null) {
1039                                    return;
1040                            }
1041    
1042                            if (_log.isInfoEnabled()) {
1043                                    _log.info(
1044                                            "Start loading lucene index files from cluster node " +
1045                                                    bootupAddress);
1046                            }
1047    
1048                            InputStream inputStream = null;
1049    
1050                            try {
1051                                    inputStream = getLoadIndexesInputStreamFromCluster(
1052                                            _companyId, bootupAddress);
1053    
1054                                    _indexAccessor.loadIndex(inputStream);
1055    
1056                                    if (_log.isInfoEnabled()) {
1057                                            _log.info("Lucene index files loaded successfully");
1058                                    }
1059                            }
1060                            catch (Exception e) {
1061                                    _log.error("Unable to load index for company " + _companyId, e);
1062                            }
1063                            finally {
1064                                    if (inputStream != null) {
1065                                            try {
1066                                                    inputStream.close();
1067                                            }
1068                                            catch (IOException ioe) {
1069                                                    _log.error(
1070                                                            "Unable to close input stream for company " +
1071                                                                    _companyId,
1072                                                            ioe);
1073                                            }
1074                                    }
1075                            }
1076                    }
1077    
1078                    @Override
1079                    public void processTimeoutException(TimeoutException timeoutException) {
1080                            _log.error(
1081                                    "Unable to load index for company " + _companyId,
1082                                    timeoutException);
1083                    }
1084    
1085                    private int _clusterNodeAddressesCount;
1086                    private long _companyId;
1087                    private IndexAccessor _indexAccessor;
1088                    private long _localLastGeneration;
1089    
1090            }
1091    
1092    }