001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036    import com.liferay.portal.kernel.search.BooleanClauseOccur;
037    import com.liferay.portal.kernel.search.Field;
038    import com.liferay.portal.kernel.util.ArrayUtil;
039    import com.liferay.portal.kernel.util.GetterUtil;
040    import com.liferay.portal.kernel.util.MethodHandler;
041    import com.liferay.portal.kernel.util.MethodKey;
042    import com.liferay.portal.kernel.util.ObjectValuePair;
043    import com.liferay.portal.kernel.util.StringPool;
044    import com.liferay.portal.kernel.util.StringUtil;
045    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
046    import com.liferay.portal.kernel.util.Validator;
047    import com.liferay.portal.model.CompanyConstants;
048    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
049    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
050    import com.liferay.portal.security.auth.TransientTokenUtil;
051    import com.liferay.portal.util.PortalInstances;
052    import com.liferay.portal.util.PortalUtil;
053    import com.liferay.portal.util.PropsValues;
054    import com.liferay.util.lucene.KeywordsUtil;
055    
056    import java.io.IOException;
057    import java.io.InputStream;
058    import java.io.OutputStream;
059    
060    import java.net.InetAddress;
061    import java.net.URL;
062    import java.net.URLConnection;
063    
064    import java.util.HashSet;
065    import java.util.List;
066    import java.util.Map;
067    import java.util.Set;
068    import java.util.concurrent.BlockingQueue;
069    import java.util.concurrent.ConcurrentHashMap;
070    import java.util.concurrent.TimeUnit;
071    import java.util.concurrent.TimeoutException;
072    
073    import org.apache.commons.lang.time.StopWatch;
074    import org.apache.lucene.analysis.Analyzer;
075    import org.apache.lucene.analysis.TokenStream;
076    import org.apache.lucene.document.Document;
077    import org.apache.lucene.index.IndexReader;
078    import org.apache.lucene.index.Term;
079    import org.apache.lucene.queryParser.QueryParser;
080    import org.apache.lucene.search.BooleanClause;
081    import org.apache.lucene.search.BooleanQuery;
082    import org.apache.lucene.search.IndexSearcher;
083    import org.apache.lucene.search.NumericRangeQuery;
084    import org.apache.lucene.search.Query;
085    import org.apache.lucene.search.TermQuery;
086    import org.apache.lucene.search.TermRangeQuery;
087    import org.apache.lucene.search.WildcardQuery;
088    import org.apache.lucene.search.highlight.Formatter;
089    import org.apache.lucene.search.highlight.Highlighter;
090    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
091    import org.apache.lucene.search.highlight.QueryScorer;
092    import org.apache.lucene.search.highlight.SimpleFragmenter;
093    import org.apache.lucene.search.highlight.WeightedTerm;
094    import org.apache.lucene.util.Version;
095    
096    /**
097     * @author Brian Wing Shun Chan
098     * @author Harry Mark
099     * @author Bruno Farache
100     * @author Shuyang Zhou
101     * @author Tina Tian
102     * @author Hugo Huijser
103     */
104    public class LuceneHelperImpl implements LuceneHelper {
105    
106            public void addDocument(long companyId, Document document)
107                    throws IOException {
108    
109                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
110    
111                    indexAccessor.addDocument(document);
112            }
113    
114            public void addExactTerm(
115                    BooleanQuery booleanQuery, String field, String value) {
116    
117                    addTerm(booleanQuery, field, value, false);
118            }
119    
120            public void addNumericRangeTerm(
121                    BooleanQuery booleanQuery, String field, String startValue,
122                    String endValue) {
123    
124                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
125                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
126                            true, true);
127    
128                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
129            }
130    
131            public void addRangeTerm(
132                    BooleanQuery booleanQuery, String field, String startValue,
133                    String endValue) {
134    
135                    boolean includesLower = true;
136    
137                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
138                            includesLower = false;
139                    }
140    
141                    boolean includesUpper = true;
142    
143                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
144                            includesUpper = false;
145                    }
146    
147                    TermRangeQuery termRangeQuery = new TermRangeQuery(
148                            field, startValue, endValue, includesLower, includesUpper);
149    
150                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
151            }
152    
153            public void addRequiredTerm(
154                    BooleanQuery booleanQuery, String field, String value, boolean like) {
155    
156                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
157            }
158    
159            public void addRequiredTerm(
160                    BooleanQuery booleanQuery, String field, String[] values,
161                    boolean like) {
162    
163                    if (values == null) {
164                            return;
165                    }
166    
167                    BooleanQuery query = new BooleanQuery();
168    
169                    for (String value : values) {
170                            addTerm(query, field, value, like);
171                    }
172    
173                    booleanQuery.add(query, BooleanClause.Occur.MUST);
174            }
175    
176            public void addTerm(
177                    BooleanQuery booleanQuery, String field, String value, boolean like) {
178    
179                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
180            }
181    
182            public void addTerm(
183                    BooleanQuery booleanQuery, String field, String value, boolean like,
184                    BooleanClauseOccur booleanClauseOccur) {
185    
186                    if (Validator.isNull(value)) {
187                            return;
188                    }
189    
190                    Analyzer analyzer = getAnalyzer();
191    
192                    if (analyzer instanceof PerFieldAnalyzer) {
193                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
194    
195                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
196    
197                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
198                                    like = true;
199                            }
200                    }
201    
202                    if (like) {
203                            value = StringUtil.replace(
204                                    value, StringPool.PERCENT, StringPool.BLANK);
205                    }
206    
207                    try {
208                            QueryParser queryParser = new QueryParser(
209                                    getVersion(), field, analyzer);
210    
211                            queryParser.setAllowLeadingWildcard(true);
212    
213                            Query query = null;
214    
215                            try {
216                                    if (like) {
217    
218                                            // LUCENE-89
219    
220                                            value = value.toLowerCase(queryParser.getLocale());
221                                    }
222    
223                                    query = queryParser.parse(value);
224                            }
225                            catch (Exception e) {
226                                    query = queryParser.parse(KeywordsUtil.escape(value));
227                            }
228    
229                            BooleanClause.Occur occur = null;
230    
231                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
232                                    occur = BooleanClause.Occur.MUST;
233                            }
234                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
235                                    occur = BooleanClause.Occur.MUST_NOT;
236                            }
237                            else {
238                                    occur = BooleanClause.Occur.SHOULD;
239                            }
240    
241                            _includeIfUnique(booleanQuery, query, occur, like);
242                    }
243                    catch (Exception e) {
244                            _log.error(e, e);
245                    }
246            }
247    
248            public void addTerm(
249                    BooleanQuery booleanQuery, String field, String[] values,
250                    boolean like) {
251    
252                    for (String value : values) {
253                            addTerm(booleanQuery, field, value, like);
254                    }
255            }
256    
257            public int countScoredFieldNames(Query query, String[] filedNames) {
258                    int count = 0;
259    
260                    for (String fieldName : filedNames) {
261                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
262                                    query, false, fieldName);
263    
264                            if ((weightedTerms.length > 0) &&
265                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
266    
267                                    count++;
268                            }
269                    }
270    
271                    return count;
272            }
273    
274            public void delete(long companyId) {
275                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
276    
277                    if (indexAccessor == null) {
278                            return;
279                    }
280    
281                    indexAccessor.delete();
282            }
283    
284            public void deleteDocuments(long companyId, Term term) throws IOException {
285                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
286    
287                    if (indexAccessor == null) {
288                            return;
289                    }
290    
291                    indexAccessor.deleteDocuments(term);
292            }
293    
294            public void dumpIndex(long companyId, OutputStream outputStream)
295                    throws IOException {
296    
297                    long lastGeneration = getLastGeneration(companyId);
298    
299                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
300                            if (_log.isDebugEnabled()) {
301                                    _log.debug(
302                                            "Dump index from cluster is not enabled for " + companyId);
303                            }
304    
305                            return;
306                    }
307    
308                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
309    
310                    if (indexAccessor == null) {
311                            return;
312                    }
313    
314                    indexAccessor.dumpIndex(outputStream);
315            }
316    
317            public Analyzer getAnalyzer() {
318                    return _analyzer;
319            }
320    
321            public long getLastGeneration(long companyId) {
322                    if (!isLoadIndexFromClusterEnabled()) {
323                            return IndexAccessor.DEFAULT_LAST_GENERATION;
324                    }
325    
326                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
327    
328                    if (indexAccessor == null) {
329                            return IndexAccessor.DEFAULT_LAST_GENERATION;
330                    }
331    
332                    return indexAccessor.getLastGeneration();
333            }
334    
335            public InputStream getLoadIndexesInputStreamFromCluster(
336                            long companyId, Address bootupAddress)
337                    throws SystemException {
338    
339                    if (!isLoadIndexFromClusterEnabled()) {
340                            return null;
341                    }
342    
343                    InputStream inputStream = null;
344    
345                    try {
346                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
347                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
348    
349                            URL url = bootupClusterNodeObjectValuePair.getValue();
350    
351                            URLConnection urlConnection = url.openConnection();
352    
353                            urlConnection.setDoOutput(true);
354    
355                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
356                                    urlConnection.getOutputStream());
357    
358                            unsyncPrintWriter.write("transientToken=");
359                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
360                            unsyncPrintWriter.write("&companyId=");
361                            unsyncPrintWriter.write(String.valueOf(companyId));
362    
363                            unsyncPrintWriter.close();
364    
365                            inputStream = urlConnection.getInputStream();
366    
367                            return inputStream;
368                    }
369                    catch (IOException ioe) {
370                            throw new SystemException(ioe);
371                    }
372            }
373    
374            public Set<String> getQueryTerms(Query query) {
375                    String queryString = StringUtil.replace(
376                            query.toString(), StringPool.STAR, StringPool.BLANK);
377    
378                    Query tempQuery = null;
379    
380                    try {
381                            QueryParser queryParser = new QueryParser(
382                                    getVersion(), StringPool.BLANK, getAnalyzer());
383    
384                            tempQuery = queryParser.parse(queryString);
385                    }
386                    catch (Exception e) {
387                            if (_log.isWarnEnabled()) {
388                                    _log.warn("Unable to parse " + queryString);
389                            }
390    
391                            tempQuery = query;
392                    }
393    
394                    WeightedTerm[] weightedTerms = null;
395    
396                    for (String fieldName : Field.KEYWORDS) {
397                            weightedTerms = QueryTermExtractor.getTerms(
398                                    tempQuery, false, fieldName);
399    
400                            if (weightedTerms.length > 0) {
401                                    break;
402                            }
403                    }
404    
405                    Set<String> queryTerms = new HashSet<String>();
406    
407                    for (WeightedTerm weightedTerm : weightedTerms) {
408                            queryTerms.add(weightedTerm.getTerm());
409                    }
410    
411                    return queryTerms;
412            }
413    
414            public IndexSearcher getSearcher(long companyId, boolean readOnly)
415                    throws IOException {
416    
417                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
418    
419                    IndexReader indexReader = IndexReader.open(
420                            indexAccessor.getLuceneDir(), readOnly);
421    
422                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
423    
424                    indexSearcher.setDefaultFieldSortScoring(true, true);
425                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
426    
427                    return indexSearcher;
428            }
429    
430            public String getSnippet(
431                            Query query, String field, String s, int maxNumFragments,
432                            int fragmentLength, String fragmentSuffix, Formatter formatter)
433                    throws IOException {
434    
435                    QueryScorer queryScorer = new QueryScorer(query, field);
436    
437                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
438    
439                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
440    
441                    TokenStream tokenStream = getAnalyzer().tokenStream(
442                            field, new UnsyncStringReader(s));
443    
444                    try {
445                            String snippet = highlighter.getBestFragments(
446                                    tokenStream, s, maxNumFragments, fragmentSuffix);
447    
448                            if (Validator.isNotNull(snippet) &&
449                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
450                                    !s.equals(snippet)) {
451    
452                                    snippet = snippet.concat(fragmentSuffix);
453                            }
454    
455                            return snippet;
456                    }
457                    catch (InvalidTokenOffsetsException itoe) {
458                            throw new IOException(itoe.getMessage());
459                    }
460            }
461    
462            public Version getVersion() {
463                    return _version;
464            }
465    
466            public boolean isLoadIndexFromClusterEnabled() {
467                    if (PropsValues.CLUSTER_LINK_ENABLED &&
468                            PropsValues.LUCENE_REPLICATE_WRITE) {
469    
470                            return true;
471                    }
472    
473                    if (_log.isDebugEnabled()) {
474                            _log.debug("Load index from cluster is not enabled");
475                    }
476    
477                    return false;
478            }
479    
480            public void loadIndex(long companyId, InputStream inputStream)
481                    throws IOException {
482    
483                    if (!isLoadIndexFromClusterEnabled()) {
484                            return;
485                    }
486    
487                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
488    
489                    if (indexAccessor == null) {
490                            if (_log.isInfoEnabled()) {
491                                    _log.info(
492                                            "Skip loading Lucene index files for company " + companyId +
493                                                    " in favor of lazy loading");
494                            }
495    
496                            return;
497                    }
498    
499                    StopWatch stopWatch = null;
500    
501                    if (_log.isInfoEnabled()) {
502                            _log.info(
503                                    "Start loading Lucene index files for company " + companyId);
504    
505                            stopWatch = new StopWatch();
506    
507                            stopWatch.start();
508                    }
509    
510                    indexAccessor.loadIndex(inputStream);
511    
512                    if (_log.isInfoEnabled()) {
513                            _log.info(
514                                    "Finished loading index files for company " + companyId +
515                                            " in " + stopWatch.getTime() + " ms");
516                    }
517            }
518    
519            public void loadIndexesFromCluster(long companyId) throws SystemException {
520                    if (!isLoadIndexFromClusterEnabled()) {
521                            return;
522                    }
523    
524                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
525    
526                    if (indexAccessor == null) {
527                            return;
528                    }
529    
530                    long localLastGeneration = getLastGeneration(companyId);
531    
532                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
533            }
534    
535            public void setAnalyzer(Analyzer analyzer) {
536                    _analyzer = analyzer;
537            }
538    
539            public void setVersion(Version version) {
540                    _version = version;
541            }
542    
543            public void shutdown() {
544                    if (_luceneIndexThreadPoolExecutor != null) {
545                            _luceneIndexThreadPoolExecutor.shutdownNow();
546    
547                            try {
548                                    _luceneIndexThreadPoolExecutor.awaitTermination(
549                                            60, TimeUnit.SECONDS);
550                            }
551                            catch (InterruptedException ie) {
552                                    _log.error("Lucene indexer shutdown interrupted", ie);
553                            }
554                    }
555    
556                    if (isLoadIndexFromClusterEnabled()) {
557                            ClusterExecutorUtil.removeClusterEventListener(
558                                    _loadIndexClusterEventListener);
559                    }
560    
561                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
562                            indexAccessor.close();
563                    }
564            }
565    
566            public void startup(long companyId) {
567                    if (PropsValues.INDEX_ON_STARTUP) {
568                            if (_log.isInfoEnabled()) {
569                                    _log.info("Indexing Lucene on startup");
570                            }
571    
572                            LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
573    
574                            if (PropsValues.INDEX_WITH_THREAD) {
575                                    if (_luceneIndexThreadPoolExecutor == null) {
576    
577                                            // This should never be null except for the case where
578                                            // VerifyProcessUtil#_verifyProcess(boolean) sets
579                                            // PropsValues#INDEX_ON_STARTUP to true.
580    
581                                            _luceneIndexThreadPoolExecutor =
582                                                    PortalExecutorManagerUtil.getPortalExecutor(
583                                                            LuceneHelperImpl.class.getName());
584                                    }
585    
586                                    _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
587                            }
588                            else {
589                                    luceneIndexer.reindex();
590                            }
591                    }
592            }
593    
594            public void updateDocument(long companyId, Term term, Document document)
595                    throws IOException {
596    
597                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
598    
599                    indexAccessor.updateDocument(term, document);
600            }
601    
602            private LuceneHelperImpl() {
603                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
604                            _luceneIndexThreadPoolExecutor =
605                                    PortalExecutorManagerUtil.getPortalExecutor(
606                                            LuceneHelperImpl.class.getName());
607                    }
608    
609                    if (isLoadIndexFromClusterEnabled()) {
610                            _loadIndexClusterEventListener =
611                                    new LoadIndexClusterEventListener();
612    
613                            ClusterExecutorUtil.addClusterEventListener(
614                                    _loadIndexClusterEventListener);
615                    }
616            }
617    
618            private ObjectValuePair<String, URL>
619                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
620                    throws SystemException {
621    
622                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
623                            new MethodHandler(
624                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
625                            bootupAddress);
626    
627                    FutureClusterResponses futureClusterResponses =
628                            ClusterExecutorUtil.execute(clusterRequest);
629    
630                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
631                            futureClusterResponses.getPartialResults();
632    
633                    try {
634                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
635                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
636    
637                            String transientToken = (String)clusterNodeResponse.getResult();
638    
639                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
640    
641                            InetAddress inetAddress = clusterNode.getInetAddress();
642    
643                            String fileName = PortalUtil.getPathContext();
644    
645                            if (!fileName.endsWith(StringPool.SLASH)) {
646                                    fileName = fileName.concat(StringPool.SLASH);
647                            }
648    
649                            fileName = fileName.concat("lucene/dump");
650    
651                            URL url = new URL(
652                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
653                                    fileName);
654    
655                            return new ObjectValuePair<String, URL>(transientToken, url);
656                    }
657                    catch (Exception e) {
658                            throw new SystemException(e);
659                    }
660            }
661    
662            private IndexAccessor _getIndexAccessor(long companyId) {
663                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
664    
665                    if (indexAccessor != null) {
666                            return indexAccessor;
667                    }
668    
669                    synchronized (this) {
670                            indexAccessor = _indexAccessors.get(companyId);
671    
672                            if (indexAccessor == null) {
673                                    indexAccessor = new IndexAccessorImpl(companyId);
674    
675                                    if (isLoadIndexFromClusterEnabled()) {
676                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
677                                                    MessageValuesThreadLocal.getValue(
678                                                            ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE));
679    
680                                            if (clusterForwardMessage) {
681                                                    if (_log.isInfoEnabled()) {
682                                                            _log.info(
683                                                                    "Skip Luncene index files cluster loading " +
684                                                                            "since this is a manual reindex request");
685                                                    }
686                                            }
687                                            else {
688                                                    indexAccessor = new SynchronizedIndexAccessorImpl(
689                                                            indexAccessor);
690    
691                                                    try {
692                                                            _loadIndexFromCluster(
693                                                                    indexAccessor,
694                                                                    indexAccessor.getLastGeneration());
695                                                    }
696                                                    catch (Exception e) {
697                                                            _log.error(
698                                                                    "Unable to load index for company " +
699                                                                            indexAccessor.getCompanyId(),
700                                                                    e);
701                                                    }
702                                            }
703                                    }
704    
705                                    _indexAccessors.put(companyId, indexAccessor);
706                            }
707                    }
708    
709                    return indexAccessor;
710            }
711    
712            private void _includeIfUnique(
713                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
714                    boolean like) {
715    
716                    if (query instanceof TermQuery) {
717                            Set<Term> terms = new HashSet<Term>();
718    
719                            query.extractTerms(terms);
720    
721                            for (Term term : terms) {
722                                    String termValue = term.text();
723    
724                                    if (like) {
725                                            term = term.createTerm(
726                                                    StringPool.STAR.concat(termValue).concat(
727                                                            StringPool.STAR));
728    
729                                            query = new WildcardQuery(term);
730                                    }
731                                    else {
732                                            query = new TermQuery(term);
733                                    }
734    
735                                    boolean included = false;
736    
737                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
738                                            if (query.equals(booleanClause.getQuery())) {
739                                                    included = true;
740                                            }
741                                    }
742    
743                                    if (!included) {
744                                            booleanQuery.add(query, occur);
745                                    }
746                            }
747                    }
748                    else if (query instanceof BooleanQuery) {
749                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
750    
751                            BooleanQuery containerBooleanQuery = new BooleanQuery();
752    
753                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
754                                    _includeIfUnique(
755                                            containerBooleanQuery, booleanClause.getQuery(),
756                                            booleanClause.getOccur(), like);
757                            }
758    
759                            if (containerBooleanQuery.getClauses().length > 0) {
760                                    booleanQuery.add(containerBooleanQuery, occur);
761                            }
762                    }
763                    else {
764                            boolean included = false;
765    
766                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
767                                    if (query.equals(booleanClause.getQuery())) {
768                                            included = true;
769                                    }
770                            }
771    
772                            if (!included) {
773                                    booleanQuery.add(query, occur);
774                            }
775                    }
776            }
777    
778            private void _loadIndexFromCluster(
779                            IndexAccessor indexAccessor, long localLastGeneration)
780                    throws SystemException {
781    
782                    List<Address> clusterNodeAddresses =
783                            ClusterExecutorUtil.getClusterNodeAddresses();
784    
785                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
786    
787                    if (clusterNodeAddressesCount <= 1) {
788                            if (_log.isDebugEnabled()) {
789                                    _log.debug(
790                                            "Do not load indexes because there is either one portal " +
791                                                    "instance or no portal instances in the cluster");
792                            }
793    
794                            return;
795                    }
796    
797                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
798                            new MethodHandler(
799                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
800                            true);
801    
802                    ClusterExecutorUtil.execute(
803                            clusterRequest,
804                            new LoadIndexClusterResponseCallback(
805                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
806            }
807    
808            private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
809    
810            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
811    
812            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
813    
814            private static MethodKey _createTokenMethodKey =
815                    new MethodKey(TransientTokenUtil.class, "createToken", long.class);
816            private static MethodKey _getLastGenerationMethodKey =
817                    new MethodKey(LuceneHelperUtil.class, "getLastGeneration", long.class);
818    
819            private Analyzer _analyzer;
820            private Map<Long, IndexAccessor> _indexAccessors =
821                    new ConcurrentHashMap<Long, IndexAccessor>();
822            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
823            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
824            private Version _version;
825    
826            private class LoadIndexClusterEventListener
827                    implements ClusterEventListener {
828    
829                    public void processClusterEvent(ClusterEvent clusterEvent) {
830                            ClusterEventType clusterEventType =
831                                    clusterEvent.getClusterEventType();
832    
833                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
834                                    return;
835                            }
836    
837                            List<Address> clusterNodeAddresses =
838                                    ClusterExecutorUtil.getClusterNodeAddresses();
839                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
840    
841                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
842                                    if (_log.isDebugEnabled()) {
843                                            _log.debug(
844                                                    "Number of original cluster members is greater than " +
845                                                            "one");
846                                    }
847    
848                                    return;
849                            }
850    
851                            long[] companyIds = PortalInstances.getCompanyIds();
852    
853                            for (long companyId : companyIds) {
854                                    loadIndexes(companyId);
855                            }
856    
857                            loadIndexes(CompanyConstants.SYSTEM);
858                    }
859    
860                    private void loadIndexes(long companyId) {
861                            long lastGeneration = getLastGeneration(companyId);
862    
863                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
864                                    return;
865                            }
866    
867                            try {
868                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
869                            }
870                            catch (Exception e) {
871                                    _log.error(
872                                            "Unable to load indexes for company " + companyId, e);
873                            }
874                    }
875    
876            }
877    
878            private class LoadIndexClusterResponseCallback
879                    extends BaseClusterResponseCallback {
880    
881                    public LoadIndexClusterResponseCallback(
882                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
883                            long localLastGeneration) {
884    
885                            _indexAccessor = indexAccessor;
886                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
887                            _localLastGeneration = localLastGeneration;
888    
889                            _companyId = _indexAccessor.getCompanyId();
890                    }
891    
892                    @Override
893                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
894                            Address bootupAddress = null;
895    
896                            do {
897                                    _clusterNodeAddressesCount--;
898    
899                                    ClusterNodeResponse clusterNodeResponse = null;
900    
901                                    try {
902                                            clusterNodeResponse = blockingQueue.poll(
903                                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
904                                                    TimeUnit.MILLISECONDS);
905                                    }
906                                    catch (Exception e) {
907                                            _log.error("Unable to get cluster node response", e);
908                                    }
909    
910                                    if (clusterNodeResponse == null) {
911                                            if (_log.isDebugEnabled()) {
912                                                    _log.debug(
913                                                            "Unable to get cluster node response in " +
914                                                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
915                                                                            TimeUnit.MILLISECONDS);
916                                            }
917    
918                                            continue;
919                                    }
920    
921                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
922    
923                                    if (clusterNode.getPort() > 0) {
924                                            try {
925                                                    long remoteLastGeneration =
926                                                            (Long)clusterNodeResponse.getResult();
927    
928                                                    if (remoteLastGeneration > _localLastGeneration) {
929                                                            bootupAddress = clusterNodeResponse.getAddress();
930    
931                                                            break;
932                                                    }
933                                            }
934                                            catch (Exception e) {
935                                                    if (_log.isDebugEnabled()) {
936                                                            _log.debug(
937                                                                    "Suppress exception caused by remote method " +
938                                                                            "invocation",
939                                                                    e);
940                                                    }
941    
942                                                    continue;
943                                            }
944                                    }
945                                    else {
946                                            if (_log.isDebugEnabled()) {
947                                                    _log.debug(
948                                                            "Cluster node " + clusterNode +
949                                                                    " has invalid port");
950                                            }
951                                    }
952                            }
953                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
954    
955                            if (bootupAddress == null) {
956                                    return;
957                            }
958    
959                            if (_log.isInfoEnabled()) {
960                                    _log.info(
961                                            "Start loading lucene index files from cluster node " +
962                                                    bootupAddress);
963                            }
964    
965                            InputStream inputStream = null;
966    
967                            try {
968                                    inputStream = getLoadIndexesInputStreamFromCluster(
969                                            _companyId, bootupAddress);
970    
971                                    _indexAccessor.loadIndex(inputStream);
972    
973                                    if (_log.isInfoEnabled()) {
974                                            _log.info("Lucene index files loaded successfully");
975                                    }
976                            }
977                            catch (Exception e) {
978                                    _log.error("Unable to load index for company " + _companyId, e);
979                            }
980                            finally {
981                                    if (inputStream != null) {
982                                            try {
983                                                    inputStream.close();
984                                            }
985                                            catch (IOException ioe) {
986                                                    _log.error(
987                                                            "Unable to close input stream for company " +
988                                                                    _companyId,
989                                                            ioe);
990                                            }
991                                    }
992                            }
993                    }
994    
995                    @Override
996                    public void processTimeoutException(TimeoutException timeoutException) {
997                            _log.error(
998                                    "Uanble to load index for company " + _companyId,
999                                    timeoutException);
1000                    }
1001    
1002                    private int _clusterNodeAddressesCount;
1003                    private long _companyId;
1004                    private IndexAccessor _indexAccessor;
1005                    private long _localLastGeneration;
1006    
1007            }
1008    
1009    }