001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLink;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036    import com.liferay.portal.kernel.search.BooleanClauseOccur;
037    import com.liferay.portal.kernel.search.Field;
038    import com.liferay.portal.kernel.util.ArrayUtil;
039    import com.liferay.portal.kernel.util.GetterUtil;
040    import com.liferay.portal.kernel.util.MethodHandler;
041    import com.liferay.portal.kernel.util.MethodKey;
042    import com.liferay.portal.kernel.util.ObjectValuePair;
043    import com.liferay.portal.kernel.util.StringPool;
044    import com.liferay.portal.kernel.util.StringUtil;
045    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
046    import com.liferay.portal.kernel.util.Validator;
047    import com.liferay.portal.model.CompanyConstants;
048    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
049    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
050    import com.liferay.portal.security.auth.TransientTokenUtil;
051    import com.liferay.portal.util.PortalInstances;
052    import com.liferay.portal.util.PortalUtil;
053    import com.liferay.portal.util.PropsValues;
054    import com.liferay.util.lucene.KeywordsUtil;
055    
056    import java.io.IOException;
057    import java.io.InputStream;
058    import java.io.OutputStream;
059    
060    import java.net.InetAddress;
061    import java.net.URL;
062    import java.net.URLConnection;
063    
064    import java.util.HashSet;
065    import java.util.List;
066    import java.util.Map;
067    import java.util.Set;
068    import java.util.concurrent.BlockingQueue;
069    import java.util.concurrent.ConcurrentHashMap;
070    import java.util.concurrent.TimeUnit;
071    import java.util.concurrent.TimeoutException;
072    
073    import org.apache.commons.lang.time.StopWatch;
074    import org.apache.lucene.analysis.Analyzer;
075    import org.apache.lucene.analysis.TokenStream;
076    import org.apache.lucene.document.Document;
077    import org.apache.lucene.index.IndexReader;
078    import org.apache.lucene.index.Term;
079    import org.apache.lucene.queryParser.QueryParser;
080    import org.apache.lucene.search.BooleanClause;
081    import org.apache.lucene.search.BooleanQuery;
082    import org.apache.lucene.search.IndexSearcher;
083    import org.apache.lucene.search.NumericRangeQuery;
084    import org.apache.lucene.search.Query;
085    import org.apache.lucene.search.TermQuery;
086    import org.apache.lucene.search.TermRangeQuery;
087    import org.apache.lucene.search.WildcardQuery;
088    import org.apache.lucene.search.highlight.Formatter;
089    import org.apache.lucene.search.highlight.Highlighter;
090    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
091    import org.apache.lucene.search.highlight.QueryScorer;
092    import org.apache.lucene.search.highlight.SimpleFragmenter;
093    import org.apache.lucene.search.highlight.WeightedTerm;
094    import org.apache.lucene.util.Version;
095    
096    /**
097     * @author Brian Wing Shun Chan
098     * @author Harry Mark
099     * @author Bruno Farache
100     * @author Shuyang Zhou
101     * @author Tina Tian
102     * @author Hugo Huijser
103     */
104    public class LuceneHelperImpl implements LuceneHelper {
105    
106            public void addDocument(long companyId, Document document)
107                    throws IOException {
108    
109                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
110    
111                    indexAccessor.addDocument(document);
112            }
113    
114            public void addExactTerm(
115                    BooleanQuery booleanQuery, String field, String value) {
116    
117                    addTerm(booleanQuery, field, value, false);
118            }
119    
120            public void addNumericRangeTerm(
121                    BooleanQuery booleanQuery, String field, String startValue,
122                    String endValue) {
123    
124                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
125                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
126                            true, true);
127    
128                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
129            }
130    
131            public void addRangeTerm(
132                    BooleanQuery booleanQuery, String field, String startValue,
133                    String endValue) {
134    
135                    boolean includesLower = true;
136    
137                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
138                            includesLower = false;
139                    }
140    
141                    boolean includesUpper = true;
142    
143                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
144                            includesUpper = false;
145                    }
146    
147                    TermRangeQuery termRangeQuery = new TermRangeQuery(
148                            field, startValue, endValue, includesLower, includesUpper);
149    
150                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
151            }
152    
153            public void addRequiredTerm(
154                    BooleanQuery booleanQuery, String field, String value, boolean like) {
155    
156                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
157            }
158    
159            public void addRequiredTerm(
160                    BooleanQuery booleanQuery, String field, String[] values,
161                    boolean like) {
162    
163                    if (values == null) {
164                            return;
165                    }
166    
167                    BooleanQuery query = new BooleanQuery();
168    
169                    for (String value : values) {
170                            addTerm(query, field, value, like);
171                    }
172    
173                    booleanQuery.add(query, BooleanClause.Occur.MUST);
174            }
175    
176            public void addTerm(
177                    BooleanQuery booleanQuery, String field, String value, boolean like) {
178    
179                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
180            }
181    
182            public void addTerm(
183                    BooleanQuery booleanQuery, String field, String value, boolean like,
184                    BooleanClauseOccur booleanClauseOccur) {
185    
186                    if (Validator.isNull(value)) {
187                            return;
188                    }
189    
190                    Analyzer analyzer = getAnalyzer();
191    
192                    if (analyzer instanceof PerFieldAnalyzer) {
193                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
194    
195                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
196    
197                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
198                                    like = true;
199                            }
200                    }
201    
202                    if (like) {
203                            value = StringUtil.replace(
204                                    value, StringPool.PERCENT, StringPool.BLANK);
205                    }
206    
207                    try {
208                            QueryParser queryParser = new QueryParser(
209                                    getVersion(), field, analyzer);
210    
211                            Query query = null;
212    
213                            try {
214                                    if (like) {
215    
216                                            // LUCENE-89
217    
218                                            value = value.toLowerCase(queryParser.getLocale());
219                                    }
220    
221                                    query = queryParser.parse(value);
222                            }
223                            catch (Exception e) {
224                                    query = queryParser.parse(KeywordsUtil.escape(value));
225                            }
226    
227                            BooleanClause.Occur occur = null;
228    
229                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
230                                    occur = BooleanClause.Occur.MUST;
231                            }
232                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
233                                    occur = BooleanClause.Occur.MUST_NOT;
234                            }
235                            else {
236                                    occur = BooleanClause.Occur.SHOULD;
237                            }
238    
239                            _includeIfUnique(booleanQuery, query, occur, like);
240                    }
241                    catch (Exception e) {
242                            _log.error(e, e);
243                    }
244            }
245    
246            public void addTerm(
247                    BooleanQuery booleanQuery, String field, String[] values,
248                    boolean like) {
249    
250                    for (String value : values) {
251                            addTerm(booleanQuery, field, value, like);
252                    }
253            }
254    
255            public int countScoredFieldNames(Query query, String[] filedNames) {
256                    int count = 0;
257    
258                    for (String fieldName : filedNames) {
259                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
260                                    query, false, fieldName);
261    
262                            if ((weightedTerms.length > 0) &&
263                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
264    
265                                    count++;
266                            }
267                    }
268    
269                    return count;
270            }
271    
272            public void delete(long companyId) {
273                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
274    
275                    if (indexAccessor == null) {
276                            return;
277                    }
278    
279                    indexAccessor.delete();
280            }
281    
282            public void deleteDocuments(long companyId, Term term) throws IOException {
283                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
284    
285                    if (indexAccessor == null) {
286                            return;
287                    }
288    
289                    indexAccessor.deleteDocuments(term);
290            }
291    
292            public void dumpIndex(long companyId, OutputStream outputStream)
293                    throws IOException {
294    
295                    long lastGeneration = getLastGeneration(companyId);
296    
297                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
298                            if (_log.isDebugEnabled()) {
299                                    _log.debug(
300                                            "Dump index from cluster is not enabled for " + companyId);
301                            }
302    
303                            return;
304                    }
305    
306                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
307    
308                    if (indexAccessor == null) {
309                            return;
310                    }
311    
312                    indexAccessor.dumpIndex(outputStream);
313            }
314    
315            public Analyzer getAnalyzer() {
316                    return _analyzer;
317            }
318    
319            public long getLastGeneration(long companyId) {
320                    if (!isLoadIndexFromClusterEnabled()) {
321                            return IndexAccessor.DEFAULT_LAST_GENERATION;
322                    }
323    
324                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
325    
326                    if (indexAccessor == null) {
327                            return IndexAccessor.DEFAULT_LAST_GENERATION;
328                    }
329    
330                    return indexAccessor.getLastGeneration();
331            }
332    
333            public InputStream getLoadIndexesInputStreamFromCluster(
334                            long companyId, Address bootupAddress)
335                    throws SystemException {
336    
337                    if (!isLoadIndexFromClusterEnabled()) {
338                            return null;
339                    }
340    
341                    InputStream inputStream = null;
342    
343                    try {
344                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
345                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
346    
347                            URL url = bootupClusterNodeObjectValuePair.getValue();
348    
349                            URLConnection urlConnection = url.openConnection();
350    
351                            urlConnection.setDoOutput(true);
352    
353                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
354                                    urlConnection.getOutputStream());
355    
356                            unsyncPrintWriter.write("transientToken=");
357                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
358                            unsyncPrintWriter.write("&companyId=");
359                            unsyncPrintWriter.write(String.valueOf(companyId));
360    
361                            unsyncPrintWriter.close();
362    
363                            inputStream = urlConnection.getInputStream();
364    
365                            return inputStream;
366                    }
367                    catch (IOException ioe) {
368                            throw new SystemException(ioe);
369                    }
370            }
371    
372            public Set<String> getQueryTerms(Query query) {
373                    String queryString = StringUtil.replace(
374                            query.toString(), StringPool.STAR, StringPool.BLANK);
375    
376                    Query tempQuery = null;
377    
378                    try {
379                            QueryParser queryParser = new QueryParser(
380                                    getVersion(), StringPool.BLANK, getAnalyzer());
381    
382                            tempQuery = queryParser.parse(queryString);
383                    }
384                    catch (Exception e) {
385                            if (_log.isWarnEnabled()) {
386                                    _log.warn("Unable to parse " + queryString);
387                            }
388    
389                            tempQuery = query;
390                    }
391    
392                    WeightedTerm[] weightedTerms = null;
393    
394                    for (String fieldName : Field.KEYWORDS) {
395                            weightedTerms = QueryTermExtractor.getTerms(
396                                    tempQuery, false, fieldName);
397    
398                            if (weightedTerms.length > 0) {
399                                    break;
400                            }
401                    }
402    
403                    Set<String> queryTerms = new HashSet<String>();
404    
405                    for (WeightedTerm weightedTerm : weightedTerms) {
406                            queryTerms.add(weightedTerm.getTerm());
407                    }
408    
409                    return queryTerms;
410            }
411    
412            public IndexSearcher getSearcher(long companyId, boolean readOnly)
413                    throws IOException {
414    
415                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
416    
417                    IndexReader indexReader = IndexReader.open(
418                            indexAccessor.getLuceneDir(), readOnly);
419    
420                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
421    
422                    indexSearcher.setDefaultFieldSortScoring(true, true);
423                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
424    
425                    return indexSearcher;
426            }
427    
428            public String getSnippet(
429                            Query query, String field, String s, int maxNumFragments,
430                            int fragmentLength, String fragmentSuffix, Formatter formatter)
431                    throws IOException {
432    
433                    QueryScorer queryScorer = new QueryScorer(query, field);
434    
435                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
436    
437                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
438    
439                    TokenStream tokenStream = getAnalyzer().tokenStream(
440                            field, new UnsyncStringReader(s));
441    
442                    try {
443                            String snippet = highlighter.getBestFragments(
444                                    tokenStream, s, maxNumFragments, fragmentSuffix);
445    
446                            if (Validator.isNotNull(snippet) &&
447                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
448                                    !s.equals(snippet)) {
449    
450                                    snippet = snippet.concat(fragmentSuffix);
451                            }
452    
453                            return snippet;
454                    }
455                    catch (InvalidTokenOffsetsException itoe) {
456                            throw new IOException(itoe.getMessage());
457                    }
458            }
459    
460            public Version getVersion() {
461                    return _version;
462            }
463    
464            public boolean isLoadIndexFromClusterEnabled() {
465                    if (PropsValues.CLUSTER_LINK_ENABLED &&
466                            PropsValues.LUCENE_REPLICATE_WRITE) {
467    
468                            return true;
469                    }
470    
471                    if (_log.isDebugEnabled()) {
472                            _log.debug("Load index from cluster is not enabled");
473                    }
474    
475                    return false;
476            }
477    
478            public void loadIndex(long companyId, InputStream inputStream)
479                    throws IOException {
480    
481                    if (!isLoadIndexFromClusterEnabled()) {
482                            return;
483                    }
484    
485                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
486    
487                    if (indexAccessor == null) {
488                            if (_log.isInfoEnabled()) {
489                                    _log.info(
490                                            "Skip loading Lucene index files for company " + companyId +
491                                                    " in favor of lazy loading");
492                            }
493    
494                            return;
495                    }
496    
497                    StopWatch stopWatch = null;
498    
499                    if (_log.isInfoEnabled()) {
500                            _log.info(
501                                    "Start loading Lucene index files for company " + companyId);
502    
503                            stopWatch = new StopWatch();
504    
505                            stopWatch.start();
506                    }
507    
508                    indexAccessor.loadIndex(inputStream);
509    
510                    if (_log.isInfoEnabled()) {
511                            _log.info(
512                                    "Finished loading index files for company " + companyId +
513                                            " in " + stopWatch.getTime() + " ms");
514                    }
515            }
516    
517            public void loadIndexesFromCluster(long companyId) throws SystemException {
518                    if (!isLoadIndexFromClusterEnabled()) {
519                            return;
520                    }
521    
522                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
523    
524                    if (indexAccessor == null) {
525                            return;
526                    }
527    
528                    long localLastGeneration = getLastGeneration(companyId);
529    
530                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
531            }
532    
533            public void setAnalyzer(Analyzer analyzer) {
534                    _analyzer = analyzer;
535            }
536    
537            public void setVersion(Version version) {
538                    _version = version;
539            }
540    
541            public void shutdown() {
542                    if (_luceneIndexThreadPoolExecutor != null) {
543                            _luceneIndexThreadPoolExecutor.shutdownNow();
544    
545                            try {
546                                    _luceneIndexThreadPoolExecutor.awaitTermination(
547                                            60, TimeUnit.SECONDS);
548                            }
549                            catch (InterruptedException ie) {
550                                    _log.error("Lucene indexer shutdown interrupted", ie);
551                            }
552                    }
553    
554                    if (isLoadIndexFromClusterEnabled()) {
555                            ClusterExecutorUtil.removeClusterEventListener(
556                                    _loadIndexClusterEventListener);
557                    }
558    
559                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
560                            indexAccessor.close();
561                    }
562            }
563    
564            public void startup(long companyId) {
565                    if (PropsValues.INDEX_ON_STARTUP) {
566                            if (_log.isInfoEnabled()) {
567                                    _log.info("Indexing Lucene on startup");
568                            }
569    
570                            LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
571    
572                            if (PropsValues.INDEX_WITH_THREAD) {
573                                    if (_luceneIndexThreadPoolExecutor == null) {
574    
575                                            // This should never be null except for the case where
576                                            // VerifyProcessUtil#_verifyProcess(boolean) sets
577                                            // PropsValues#INDEX_ON_STARTUP to true.
578    
579                                            _luceneIndexThreadPoolExecutor =
580                                                    PortalExecutorManagerUtil.getPortalExecutor(
581                                                            LuceneHelperImpl.class.getName());
582                                    }
583    
584                                    _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
585                            }
586                            else {
587                                    luceneIndexer.reindex();
588                            }
589                    }
590            }
591    
592            public void updateDocument(long companyId, Term term, Document document)
593                    throws IOException {
594    
595                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
596    
597                    indexAccessor.updateDocument(term, document);
598            }
599    
600            private LuceneHelperImpl() {
601                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
602                            _luceneIndexThreadPoolExecutor =
603                                    PortalExecutorManagerUtil.getPortalExecutor(
604                                            LuceneHelperImpl.class.getName());
605                    }
606    
607                    if (isLoadIndexFromClusterEnabled()) {
608                            _loadIndexClusterEventListener =
609                                    new LoadIndexClusterEventListener();
610    
611                            ClusterExecutorUtil.addClusterEventListener(
612                                    _loadIndexClusterEventListener);
613                    }
614            }
615    
616            private ObjectValuePair<String, URL>
617                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
618                    throws SystemException {
619    
620                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
621                            new MethodHandler(
622                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
623                            bootupAddress);
624    
625                    FutureClusterResponses futureClusterResponses =
626                            ClusterExecutorUtil.execute(clusterRequest);
627    
628                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
629                            futureClusterResponses.getPartialResults();
630    
631                    try {
632                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
633                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
634    
635                            String transientToken = (String)clusterNodeResponse.getResult();
636    
637                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
638    
639                            InetAddress inetAddress = clusterNode.getInetAddress();
640    
641                            String fileName = PortalUtil.getPathContext();
642    
643                            if (!fileName.endsWith(StringPool.SLASH)) {
644                                    fileName = fileName.concat(StringPool.SLASH);
645                            }
646    
647                            fileName = fileName.concat("lucene/dump");
648    
649                            URL url = new URL(
650                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
651                                    fileName);
652    
653                            return new ObjectValuePair<String, URL>(transientToken, url);
654                    }
655                    catch (Exception e) {
656                            throw new SystemException(e);
657                    }
658            }
659    
660            private IndexAccessor _getIndexAccessor(long companyId) {
661                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
662    
663                    if (indexAccessor != null) {
664                            return indexAccessor;
665                    }
666    
667                    synchronized (this) {
668                            indexAccessor = _indexAccessors.get(companyId);
669    
670                            if (indexAccessor == null) {
671                                    indexAccessor = new IndexAccessorImpl(companyId);
672    
673                                    if (isLoadIndexFromClusterEnabled()) {
674                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
675                                                    MessageValuesThreadLocal.getValue(
676                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
677    
678                                            if (clusterForwardMessage) {
679                                                    if (_log.isInfoEnabled()) {
680                                                            _log.info(
681                                                                    "Skip Luncene index files cluster loading " +
682                                                                            "since this is a manual reindex request");
683                                                    }
684                                            }
685                                            else {
686                                                    indexAccessor = new SynchronizedIndexAccessorImpl(
687                                                            indexAccessor);
688    
689                                                    try {
690                                                            _loadIndexFromCluster(
691                                                                    indexAccessor,
692                                                                    indexAccessor.getLastGeneration());
693                                                    }
694                                                    catch (Exception e) {
695                                                            _log.error(
696                                                                    "Unable to load index for company " +
697                                                                            indexAccessor.getCompanyId(),
698                                                                    e);
699                                                    }
700                                            }
701                                    }
702    
703                                    _indexAccessors.put(companyId, indexAccessor);
704                            }
705                    }
706    
707                    return indexAccessor;
708            }
709    
710            private void _includeIfUnique(
711                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
712                    boolean like) {
713    
714                    if (query instanceof TermQuery) {
715                            Set<Term> terms = new HashSet<Term>();
716    
717                            query.extractTerms(terms);
718    
719                            for (Term term : terms) {
720                                    String termValue = term.text();
721    
722                                    if (like) {
723                                            term = term.createTerm(
724                                                    StringPool.STAR.concat(termValue).concat(
725                                                            StringPool.STAR));
726    
727                                            query = new WildcardQuery(term);
728                                    }
729                                    else {
730                                            query = new TermQuery(term);
731                                    }
732    
733                                    boolean included = false;
734    
735                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
736                                            if (query.equals(booleanClause.getQuery())) {
737                                                    included = true;
738                                            }
739                                    }
740    
741                                    if (!included) {
742                                            booleanQuery.add(query, occur);
743                                    }
744                            }
745                    }
746                    else if (query instanceof BooleanQuery) {
747                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
748    
749                            BooleanQuery containerBooleanQuery = new BooleanQuery();
750    
751                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
752                                    _includeIfUnique(
753                                            containerBooleanQuery, booleanClause.getQuery(),
754                                            booleanClause.getOccur(), like);
755                            }
756    
757                            if (containerBooleanQuery.getClauses().length > 0) {
758                                    booleanQuery.add(containerBooleanQuery, occur);
759                            }
760                    }
761                    else {
762                            boolean included = false;
763    
764                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
765                                    if (query.equals(booleanClause.getQuery())) {
766                                            included = true;
767                                    }
768                            }
769    
770                            if (!included) {
771                                    booleanQuery.add(query, occur);
772                            }
773                    }
774            }
775    
776            private void _loadIndexFromCluster(
777                            IndexAccessor indexAccessor, long localLastGeneration)
778                    throws SystemException {
779    
780                    List<Address> clusterNodeAddresses =
781                            ClusterExecutorUtil.getClusterNodeAddresses();
782    
783                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
784    
785                    if (clusterNodeAddressesCount <= 1) {
786                            if (_log.isDebugEnabled()) {
787                                    _log.debug(
788                                            "Do not load indexes because there is either one portal " +
789                                                    "instance or no portal instances in the cluster");
790                            }
791    
792                            return;
793                    }
794    
795                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
796                            new MethodHandler(
797                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
798                            true);
799    
800                    ClusterExecutorUtil.execute(
801                            clusterRequest,
802                            new LoadIndexClusterResponseCallback(
803                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
804            }
805    
806            private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
807    
808            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
809    
810            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
811    
812            private static MethodKey _createTokenMethodKey = new MethodKey(
813                    TransientTokenUtil.class, "createToken", long.class);
814            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
815                    LuceneHelperUtil.class, "getLastGeneration", long.class);
816    
817            private Analyzer _analyzer;
818            private Map<Long, IndexAccessor> _indexAccessors =
819                    new ConcurrentHashMap<Long, IndexAccessor>();
820            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
821            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
822            private Version _version;
823    
824            private class LoadIndexClusterEventListener
825                    implements ClusterEventListener {
826    
827                    public void processClusterEvent(ClusterEvent clusterEvent) {
828                            ClusterEventType clusterEventType =
829                                    clusterEvent.getClusterEventType();
830    
831                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
832                                    return;
833                            }
834    
835                            List<Address> clusterNodeAddresses =
836                                    ClusterExecutorUtil.getClusterNodeAddresses();
837                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
838    
839                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
840                                    if (_log.isDebugEnabled()) {
841                                            _log.debug(
842                                                    "Number of original cluster members is greater than " +
843                                                            "one");
844                                    }
845    
846                                    return;
847                            }
848    
849                            long[] companyIds = PortalInstances.getCompanyIds();
850    
851                            for (long companyId : companyIds) {
852                                    loadIndexes(companyId);
853                            }
854    
855                            loadIndexes(CompanyConstants.SYSTEM);
856                    }
857    
858                    private void loadIndexes(long companyId) {
859                            long lastGeneration = getLastGeneration(companyId);
860    
861                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
862                                    return;
863                            }
864    
865                            try {
866                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
867                            }
868                            catch (Exception e) {
869                                    _log.error(
870                                            "Unable to load indexes for company " + companyId, e);
871                            }
872                    }
873    
874            }
875    
876            private class LoadIndexClusterResponseCallback
877                    extends BaseClusterResponseCallback {
878    
879                    public LoadIndexClusterResponseCallback(
880                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
881                            long localLastGeneration) {
882    
883                            _indexAccessor = indexAccessor;
884                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
885                            _localLastGeneration = localLastGeneration;
886    
887                            _companyId = _indexAccessor.getCompanyId();
888                    }
889    
890                    @Override
891                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
892                            Address bootupAddress = null;
893    
894                            do {
895                                    _clusterNodeAddressesCount--;
896    
897                                    ClusterNodeResponse clusterNodeResponse = null;
898    
899                                    try {
900                                            clusterNodeResponse = blockingQueue.poll(
901                                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
902                                                    TimeUnit.MILLISECONDS);
903                                    }
904                                    catch (Exception e) {
905                                            _log.error("Unable to get cluster node response", e);
906                                    }
907    
908                                    if (clusterNodeResponse == null) {
909                                            if (_log.isDebugEnabled()) {
910                                                    _log.debug(
911                                                            "Unable to get cluster node response in " +
912                                                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
913                                                                            TimeUnit.MILLISECONDS);
914                                            }
915    
916                                            continue;
917                                    }
918    
919                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
920    
921                                    if (clusterNode.getPort() > 0) {
922                                            try {
923                                                    long remoteLastGeneration =
924                                                            (Long)clusterNodeResponse.getResult();
925    
926                                                    if (remoteLastGeneration > _localLastGeneration) {
927                                                            bootupAddress = clusterNodeResponse.getAddress();
928    
929                                                            break;
930                                                    }
931                                            }
932                                            catch (Exception e) {
933                                                    if (_log.isDebugEnabled()) {
934                                                            _log.debug(
935                                                                    "Suppress exception caused by remote method " +
936                                                                            "invocation",
937                                                                    e);
938                                                    }
939    
940                                                    continue;
941                                            }
942                                    }
943                                    else {
944                                            if (_log.isDebugEnabled()) {
945                                                    _log.debug(
946                                                            "Cluster node " + clusterNode +
947                                                                    " has invalid port");
948                                            }
949                                    }
950                            }
951                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
952    
953                            if (bootupAddress == null) {
954                                    return;
955                            }
956    
957                            if (_log.isInfoEnabled()) {
958                                    _log.info(
959                                            "Start loading lucene index files from cluster node " +
960                                                    bootupAddress);
961                            }
962    
963                            InputStream inputStream = null;
964    
965                            try {
966                                    inputStream = getLoadIndexesInputStreamFromCluster(
967                                            _companyId, bootupAddress);
968    
969                                    _indexAccessor.loadIndex(inputStream);
970    
971                                    if (_log.isInfoEnabled()) {
972                                            _log.info("Lucene index files loaded successfully");
973                                    }
974                            }
975                            catch (Exception e) {
976                                    _log.error("Unable to load index for company " + _companyId, e);
977                            }
978                            finally {
979                                    if (inputStream != null) {
980                                            try {
981                                                    inputStream.close();
982                                            }
983                                            catch (IOException ioe) {
984                                                    _log.error(
985                                                            "Unable to close input stream for company " +
986                                                                    _companyId,
987                                                            ioe);
988                                            }
989                                    }
990                            }
991                    }
992    
993                    @Override
994                    public void processTimeoutException(TimeoutException timeoutException) {
995                            _log.error(
996                                    "Uanble to load index for company " + _companyId,
997                                    timeoutException);
998                    }
999    
1000                    private int _clusterNodeAddressesCount;
1001                    private long _companyId;
1002                    private IndexAccessor _indexAccessor;
1003                    private long _localLastGeneration;
1004    
1005            }
1006    
1007    }