001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036    import com.liferay.portal.kernel.search.BooleanClauseOccur;
037    import com.liferay.portal.kernel.search.Field;
038    import com.liferay.portal.kernel.util.ArrayUtil;
039    import com.liferay.portal.kernel.util.GetterUtil;
040    import com.liferay.portal.kernel.util.MethodHandler;
041    import com.liferay.portal.kernel.util.MethodKey;
042    import com.liferay.portal.kernel.util.ObjectValuePair;
043    import com.liferay.portal.kernel.util.PropsKeys;
044    import com.liferay.portal.kernel.util.StringPool;
045    import com.liferay.portal.kernel.util.StringUtil;
046    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
047    import com.liferay.portal.kernel.util.Validator;
048    import com.liferay.portal.model.CompanyConstants;
049    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
050    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
051    import com.liferay.portal.security.auth.TransientTokenUtil;
052    import com.liferay.portal.util.PortalInstances;
053    import com.liferay.portal.util.PortalUtil;
054    import com.liferay.portal.util.PropsUtil;
055    import com.liferay.portal.util.PropsValues;
056    import com.liferay.util.lucene.KeywordsUtil;
057    
058    import java.io.IOException;
059    import java.io.InputStream;
060    import java.io.OutputStream;
061    
062    import java.net.InetAddress;
063    import java.net.URL;
064    import java.net.URLConnection;
065    
066    import java.util.HashSet;
067    import java.util.List;
068    import java.util.Map;
069    import java.util.Set;
070    import java.util.concurrent.BlockingQueue;
071    import java.util.concurrent.ConcurrentHashMap;
072    import java.util.concurrent.TimeUnit;
073    import java.util.concurrent.TimeoutException;
074    
075    import org.apache.commons.lang.time.StopWatch;
076    import org.apache.lucene.analysis.Analyzer;
077    import org.apache.lucene.analysis.TokenStream;
078    import org.apache.lucene.document.Document;
079    import org.apache.lucene.index.IndexReader;
080    import org.apache.lucene.index.Term;
081    import org.apache.lucene.queryParser.QueryParser;
082    import org.apache.lucene.search.BooleanClause;
083    import org.apache.lucene.search.BooleanQuery;
084    import org.apache.lucene.search.IndexSearcher;
085    import org.apache.lucene.search.NumericRangeQuery;
086    import org.apache.lucene.search.Query;
087    import org.apache.lucene.search.TermQuery;
088    import org.apache.lucene.search.TermRangeQuery;
089    import org.apache.lucene.search.WildcardQuery;
090    import org.apache.lucene.search.highlight.Highlighter;
091    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
092    import org.apache.lucene.search.highlight.QueryScorer;
093    import org.apache.lucene.search.highlight.SimpleFragmenter;
094    import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
095    import org.apache.lucene.search.highlight.WeightedTerm;
096    import org.apache.lucene.util.Version;
097    
098    /**
099     * @author Brian Wing Shun Chan
100     * @author Harry Mark
101     * @author Bruno Farache
102     * @author Shuyang Zhou
103     * @author Tina Tian
104     * @author Hugo Huijser
105     */
106    public class LuceneHelperImpl implements LuceneHelper {
107    
108            @Override
109            public void addDocument(long companyId, Document document)
110                    throws IOException {
111    
112                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
113    
114                    indexAccessor.addDocument(document);
115            }
116    
117            @Override
118            public void addExactTerm(
119                    BooleanQuery booleanQuery, String field, String value) {
120    
121                    addTerm(booleanQuery, field, value, false);
122            }
123    
124            @Override
125            public void addNumericRangeTerm(
126                    BooleanQuery booleanQuery, String field, String startValue,
127                    String endValue) {
128    
129                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
130                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
131                            true, true);
132    
133                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
134            }
135    
136            @Override
137            public void addRangeTerm(
138                    BooleanQuery booleanQuery, String field, String startValue,
139                    String endValue) {
140    
141                    boolean includesLower = true;
142    
143                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
144                            includesLower = false;
145                    }
146    
147                    boolean includesUpper = true;
148    
149                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
150                            includesUpper = false;
151                    }
152    
153                    TermRangeQuery termRangeQuery = new TermRangeQuery(
154                            field, startValue, endValue, includesLower, includesUpper);
155    
156                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
157            }
158    
159            @Override
160            public void addRequiredTerm(
161                    BooleanQuery booleanQuery, String field, String value, boolean like) {
162    
163                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
164            }
165    
166            @Override
167            public void addRequiredTerm(
168                    BooleanQuery booleanQuery, String field, String[] values,
169                    boolean like) {
170    
171                    if (values == null) {
172                            return;
173                    }
174    
175                    BooleanQuery query = new BooleanQuery();
176    
177                    for (String value : values) {
178                            addTerm(query, field, value, like);
179                    }
180    
181                    booleanQuery.add(query, BooleanClause.Occur.MUST);
182            }
183    
184            @Override
185            public void addTerm(
186                    BooleanQuery booleanQuery, String field, String value, boolean like) {
187    
188                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
189            }
190    
191            @Override
192            public void addTerm(
193                    BooleanQuery booleanQuery, String field, String value, boolean like,
194                    BooleanClauseOccur booleanClauseOccur) {
195    
196                    if (Validator.isNull(value)) {
197                            return;
198                    }
199    
200                    Analyzer analyzer = getAnalyzer();
201    
202                    if (analyzer instanceof PerFieldAnalyzer) {
203                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
204    
205                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
206    
207                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
208                                    like = true;
209                            }
210                    }
211    
212                    if (like) {
213                            value = StringUtil.replace(
214                                    value, StringPool.PERCENT, StringPool.BLANK);
215                    }
216    
217                    try {
218                            QueryParser queryParser = new QueryParser(
219                                    getVersion(), field, analyzer);
220    
221                            Query query = queryParser.parse(value);
222    
223                            try {
224                                    if (like && (query instanceof TermQuery)) {
225    
226                                            // LUCENE-89
227    
228                                            TermQuery termQuery = (TermQuery)query;
229    
230                                            Term term = termQuery.getTerm();
231    
232                                            value = term.text();
233                                            value = value.toLowerCase(queryParser.getLocale());
234    
235                                            query = new TermQuery(term.createTerm(value));
236                                    }
237                            }
238                            catch (Exception e) {
239                                    query = queryParser.parse(KeywordsUtil.escape(value));
240                            }
241    
242                            BooleanClause.Occur occur = null;
243    
244                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
245                                    occur = BooleanClause.Occur.MUST;
246                            }
247                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
248                                    occur = BooleanClause.Occur.MUST_NOT;
249                            }
250                            else {
251                                    occur = BooleanClause.Occur.SHOULD;
252                            }
253    
254                            _includeIfUnique(booleanQuery, query, occur, like);
255                    }
256                    catch (Exception e) {
257                            _log.error(e, e);
258                    }
259            }
260    
261            @Override
262            public void addTerm(
263                    BooleanQuery booleanQuery, String field, String[] values,
264                    boolean like) {
265    
266                    for (String value : values) {
267                            addTerm(booleanQuery, field, value, like);
268                    }
269            }
270    
271            @Override
272            public int countScoredFieldNames(Query query, String[] filedNames) {
273                    int count = 0;
274    
275                    for (String fieldName : filedNames) {
276                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
277                                    query, false, fieldName);
278    
279                            if ((weightedTerms.length > 0) &&
280                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
281    
282                                    count++;
283                            }
284                    }
285    
286                    return count;
287            }
288    
289            @Override
290            public void delete(long companyId) {
291                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
292    
293                    if (indexAccessor == null) {
294                            return;
295                    }
296    
297                    indexAccessor.delete();
298            }
299    
300            @Override
301            public void deleteDocuments(long companyId, Term term) throws IOException {
302                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
303    
304                    if (indexAccessor == null) {
305                            return;
306                    }
307    
308                    indexAccessor.deleteDocuments(term);
309            }
310    
311            @Override
312            public void dumpIndex(long companyId, OutputStream outputStream)
313                    throws IOException {
314    
315                    long lastGeneration = getLastGeneration(companyId);
316    
317                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
318                            if (_log.isDebugEnabled()) {
319                                    _log.debug(
320                                            "Dump index from cluster is not enabled for " + companyId);
321                            }
322    
323                            return;
324                    }
325    
326                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
327    
328                    if (indexAccessor == null) {
329                            return;
330                    }
331    
332                    indexAccessor.dumpIndex(outputStream);
333            }
334    
335            @Override
336            public Analyzer getAnalyzer() {
337                    return _analyzer;
338            }
339    
340            @Override
341            public long getLastGeneration(long companyId) {
342                    if (!isLoadIndexFromClusterEnabled()) {
343                            return IndexAccessor.DEFAULT_LAST_GENERATION;
344                    }
345    
346                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
347    
348                    if (indexAccessor == null) {
349                            return IndexAccessor.DEFAULT_LAST_GENERATION;
350                    }
351    
352                    return indexAccessor.getLastGeneration();
353            }
354    
355            @Override
356            public InputStream getLoadIndexesInputStreamFromCluster(
357                            long companyId, Address bootupAddress)
358                    throws SystemException {
359    
360                    if (!isLoadIndexFromClusterEnabled()) {
361                            return null;
362                    }
363    
364                    InputStream inputStream = null;
365    
366                    try {
367                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
368                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
369    
370                            URL url = bootupClusterNodeObjectValuePair.getValue();
371    
372                            URLConnection urlConnection = url.openConnection();
373    
374                            urlConnection.setDoOutput(true);
375    
376                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
377                                    urlConnection.getOutputStream());
378    
379                            unsyncPrintWriter.write("transientToken=");
380                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
381                            unsyncPrintWriter.write("&companyId=");
382                            unsyncPrintWriter.write(String.valueOf(companyId));
383    
384                            unsyncPrintWriter.close();
385    
386                            inputStream = urlConnection.getInputStream();
387    
388                            return inputStream;
389                    }
390                    catch (IOException ioe) {
391                            throw new SystemException(ioe);
392                    }
393            }
394    
395            @Override
396            public String[] getQueryTerms(Query query) {
397                    String queryString = StringUtil.replace(
398                            query.toString(), StringPool.STAR, StringPool.BLANK);
399    
400                    Query tempQuery = null;
401    
402                    try {
403                            QueryParser queryParser = new QueryParser(
404                                    getVersion(), StringPool.BLANK, getAnalyzer());
405    
406                            tempQuery = queryParser.parse(queryString);
407                    }
408                    catch (Exception e) {
409                            if (_log.isWarnEnabled()) {
410                                    _log.warn("Unable to parse " + queryString);
411                            }
412    
413                            tempQuery = query;
414                    }
415    
416                    WeightedTerm[] weightedTerms = null;
417    
418                    for (String fieldName : Field.KEYWORDS) {
419                            weightedTerms = QueryTermExtractor.getTerms(
420                                    tempQuery, false, fieldName);
421    
422                            if (weightedTerms.length > 0) {
423                                    break;
424                            }
425                    }
426    
427                    Set<String> queryTerms = new HashSet<String>();
428    
429                    for (WeightedTerm weightedTerm : weightedTerms) {
430                            queryTerms.add(weightedTerm.getTerm());
431                    }
432    
433                    return queryTerms.toArray(new String[queryTerms.size()]);
434            }
435    
436            @Override
437            public IndexSearcher getSearcher(long companyId, boolean readOnly)
438                    throws IOException {
439    
440                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
441    
442                    IndexReader indexReader = IndexReader.open(
443                            indexAccessor.getLuceneDir(), readOnly);
444    
445                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
446    
447                    indexSearcher.setDefaultFieldSortScoring(true, true);
448                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
449    
450                    return indexSearcher;
451            }
452    
453            @Override
454            public String getSnippet(
455                            Query query, String field, String s, int maxNumFragments,
456                            int fragmentLength, String fragmentSuffix, String preTag,
457                            String postTag)
458                    throws IOException {
459    
460                    SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
461                            preTag, postTag);
462    
463                    QueryScorer queryScorer = new QueryScorer(query, field);
464    
465                    Highlighter highlighter = new Highlighter(
466                            simpleHTMLFormatter, queryScorer);
467    
468                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
469    
470                    TokenStream tokenStream = getAnalyzer().tokenStream(
471                            field, new UnsyncStringReader(s));
472    
473                    try {
474                            String snippet = highlighter.getBestFragments(
475                                    tokenStream, s, maxNumFragments, fragmentSuffix);
476    
477                            if (Validator.isNotNull(snippet) &&
478                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
479                                    !s.equals(snippet)) {
480    
481                                    snippet = snippet.concat(fragmentSuffix);
482                            }
483    
484                            return snippet;
485                    }
486                    catch (InvalidTokenOffsetsException itoe) {
487                            throw new IOException(itoe.getMessage());
488                    }
489            }
490    
491            @Override
492            public Version getVersion() {
493                    return _version;
494            }
495    
496            @Override
497            public boolean isLoadIndexFromClusterEnabled() {
498                    if (PropsValues.CLUSTER_LINK_ENABLED &&
499                            PropsValues.LUCENE_REPLICATE_WRITE) {
500    
501                            return true;
502                    }
503    
504                    if (_log.isDebugEnabled()) {
505                            _log.debug("Load index from cluster is not enabled");
506                    }
507    
508                    return false;
509            }
510    
511            @Override
512            public void loadIndex(long companyId, InputStream inputStream)
513                    throws IOException {
514    
515                    if (!isLoadIndexFromClusterEnabled()) {
516                            return;
517                    }
518    
519                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
520    
521                    if (indexAccessor == null) {
522                            if (_log.isInfoEnabled()) {
523                                    _log.info(
524                                            "Skip loading Lucene index files for company " + companyId +
525                                                    " in favor of lazy loading");
526                            }
527    
528                            return;
529                    }
530    
531                    StopWatch stopWatch = null;
532    
533                    if (_log.isInfoEnabled()) {
534                            _log.info(
535                                    "Start loading Lucene index files for company " + companyId);
536    
537                            stopWatch = new StopWatch();
538    
539                            stopWatch.start();
540                    }
541    
542                    indexAccessor.loadIndex(inputStream);
543    
544                    if (_log.isInfoEnabled()) {
545                            _log.info(
546                                    "Finished loading index files for company " + companyId +
547                                            " in " + stopWatch.getTime() + " ms");
548                    }
549            }
550    
551            @Override
552            public void loadIndexesFromCluster(long companyId) throws SystemException {
553                    if (!isLoadIndexFromClusterEnabled()) {
554                            return;
555                    }
556    
557                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
558    
559                    if (indexAccessor == null) {
560                            return;
561                    }
562    
563                    long localLastGeneration = getLastGeneration(companyId);
564    
565                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
566            }
567    
568            public void setAnalyzer(Analyzer analyzer) {
569                    _analyzer = analyzer;
570            }
571    
572            public void setVersion(Version version) {
573                    _version = version;
574            }
575    
576            @Override
577            public void shutdown() {
578                    if (_luceneIndexThreadPoolExecutor != null) {
579                            _luceneIndexThreadPoolExecutor.shutdownNow();
580    
581                            try {
582                                    _luceneIndexThreadPoolExecutor.awaitTermination(
583                                            60, TimeUnit.SECONDS);
584                            }
585                            catch (InterruptedException ie) {
586                                    _log.error("Lucene indexer shutdown interrupted", ie);
587                            }
588                    }
589    
590                    if (isLoadIndexFromClusterEnabled()) {
591                            ClusterExecutorUtil.removeClusterEventListener(
592                                    _loadIndexClusterEventListener);
593                    }
594    
595                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
596                            indexAccessor.close();
597                    }
598            }
599    
600            @Override
601            public void startup(long companyId) {
602                    if (!PropsValues.INDEX_ON_STARTUP) {
603                            return;
604                    }
605    
606                    if (_log.isInfoEnabled()) {
607                            _log.info("Indexing Lucene on startup");
608                    }
609    
610                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
611    
612                    if (PropsValues.INDEX_WITH_THREAD) {
613                            if (_luceneIndexThreadPoolExecutor == null) {
614    
615                                    // This should never be null except for the case where
616                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
617                                    // PropsValues#INDEX_ON_STARTUP to true.
618    
619                                    _luceneIndexThreadPoolExecutor =
620                                            PortalExecutorManagerUtil.getPortalExecutor(
621                                                    LuceneHelperImpl.class.getName());
622                            }
623    
624                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
625                    }
626                    else {
627                            luceneIndexer.reindex();
628                    }
629            }
630    
631            @Override
632            public void updateDocument(long companyId, Term term, Document document)
633                    throws IOException {
634    
635                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
636    
637                    indexAccessor.updateDocument(term, document);
638            }
639    
640            private LuceneHelperImpl() {
641                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
642                            _luceneIndexThreadPoolExecutor =
643                                    PortalExecutorManagerUtil.getPortalExecutor(
644                                            LuceneHelperImpl.class.getName());
645                    }
646    
647                    if (isLoadIndexFromClusterEnabled()) {
648                            _loadIndexClusterEventListener =
649                                    new LoadIndexClusterEventListener();
650    
651                            ClusterExecutorUtil.addClusterEventListener(
652                                    _loadIndexClusterEventListener);
653                    }
654    
655                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
656            }
657    
658            private ObjectValuePair<String, URL>
659                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
660                    throws SystemException {
661    
662                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
663                            new MethodHandler(
664                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
665                            bootupAddress);
666    
667                    FutureClusterResponses futureClusterResponses =
668                            ClusterExecutorUtil.execute(clusterRequest);
669    
670                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
671                            futureClusterResponses.getPartialResults();
672    
673                    try {
674                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
675                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
676                                    TimeUnit.MILLISECONDS);
677    
678                            String transientToken = (String)clusterNodeResponse.getResult();
679    
680                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
681    
682                            InetAddress inetAddress = clusterNode.getInetAddress();
683    
684                            String fileName = PortalUtil.getPathContext();
685    
686                            if (!fileName.endsWith(StringPool.SLASH)) {
687                                    fileName = fileName.concat(StringPool.SLASH);
688                            }
689    
690                            fileName = fileName.concat("lucene/dump");
691    
692                            URL url = new URL(
693                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
694                                    fileName);
695    
696                            return new ObjectValuePair<String, URL>(transientToken, url);
697                    }
698                    catch (Exception e) {
699                            throw new SystemException(e);
700                    }
701            }
702    
703            private IndexAccessor _getIndexAccessor(long companyId) {
704                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
705    
706                    if (indexAccessor != null) {
707                            return indexAccessor;
708                    }
709    
710                    synchronized (this) {
711                            indexAccessor = _indexAccessors.get(companyId);
712    
713                            if (indexAccessor == null) {
714                                    indexAccessor = new IndexAccessorImpl(companyId);
715    
716                                    if (isLoadIndexFromClusterEnabled()) {
717                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
718                                                    MessageValuesThreadLocal.getValue(
719                                                            ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE));
720    
721                                            if (clusterForwardMessage) {
722                                                    if (_log.isInfoEnabled()) {
723                                                            _log.info(
724                                                                    "Skip Luncene index files cluster loading " +
725                                                                            "since this is a manual reindex request");
726                                                    }
727                                            }
728                                            else {
729                                                    indexAccessor = new SynchronizedIndexAccessorImpl(
730                                                            indexAccessor);
731    
732                                                    try {
733                                                            _loadIndexFromCluster(
734                                                                    indexAccessor,
735                                                                    indexAccessor.getLastGeneration());
736                                                    }
737                                                    catch (Exception e) {
738                                                            _log.error(
739                                                                    "Unable to load index for company " +
740                                                                            indexAccessor.getCompanyId(),
741                                                                    e);
742                                                    }
743                                            }
744                                    }
745    
746                                    _indexAccessors.put(companyId, indexAccessor);
747                            }
748                    }
749    
750                    return indexAccessor;
751            }
752    
753            private void _includeIfUnique(
754                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
755                    boolean like) {
756    
757                    if (query instanceof TermQuery) {
758                            Set<Term> terms = new HashSet<Term>();
759    
760                            query.extractTerms(terms);
761    
762                            for (Term term : terms) {
763                                    String termValue = term.text();
764    
765                                    if (like) {
766                                            term = term.createTerm(
767                                                    StringPool.STAR.concat(termValue).concat(
768                                                            StringPool.STAR));
769    
770                                            query = new WildcardQuery(term);
771                                    }
772                                    else {
773                                            query = new TermQuery(term);
774                                    }
775    
776                                    boolean included = false;
777    
778                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
779                                            if (query.equals(booleanClause.getQuery())) {
780                                                    included = true;
781                                            }
782                                    }
783    
784                                    if (!included) {
785                                            booleanQuery.add(query, occur);
786                                    }
787                            }
788                    }
789                    else if (query instanceof BooleanQuery) {
790                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
791    
792                            BooleanQuery containerBooleanQuery = new BooleanQuery();
793    
794                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
795                                    _includeIfUnique(
796                                            containerBooleanQuery, booleanClause.getQuery(),
797                                            booleanClause.getOccur(), like);
798                            }
799    
800                            if (containerBooleanQuery.getClauses().length > 0) {
801                                    booleanQuery.add(containerBooleanQuery, occur);
802                            }
803                    }
804                    else {
805                            boolean included = false;
806    
807                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
808                                    if (query.equals(booleanClause.getQuery())) {
809                                            included = true;
810                                    }
811                            }
812    
813                            if (!included) {
814                                    booleanQuery.add(query, occur);
815                            }
816                    }
817            }
818    
819            private void _loadIndexFromCluster(
820                            IndexAccessor indexAccessor, long localLastGeneration)
821                    throws SystemException {
822    
823                    List<Address> clusterNodeAddresses =
824                            ClusterExecutorUtil.getClusterNodeAddresses();
825    
826                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
827    
828                    if (clusterNodeAddressesCount <= 1) {
829                            if (_log.isDebugEnabled()) {
830                                    _log.debug(
831                                            "Do not load indexes because there is either one portal " +
832                                                    "instance or no portal instances in the cluster");
833                            }
834    
835                            return;
836                    }
837    
838                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
839                            new MethodHandler(
840                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
841                            true);
842    
843                    ClusterExecutorUtil.execute(
844                            clusterRequest,
845                            new LoadIndexClusterResponseCallback(
846                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
847            }
848    
849            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
850                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
851    
852            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
853                    GetterUtil.getInteger(
854                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
855                            BooleanQuery.getMaxClauseCount());
856    
857            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
858    
859            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
860    
861            private static MethodKey _createTokenMethodKey =
862                    new MethodKey(TransientTokenUtil.class.getName(), "createToken",
863                    long.class);
864            private static MethodKey _getLastGenerationMethodKey =
865                    new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
866                    long.class);
867    
868            private Analyzer _analyzer;
869            private Map<Long, IndexAccessor> _indexAccessors =
870                    new ConcurrentHashMap<Long, IndexAccessor>();
871            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
872            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
873            private Version _version;
874    
875            private class LoadIndexClusterEventListener
876                    implements ClusterEventListener {
877    
878                    @Override
879                    public void processClusterEvent(ClusterEvent clusterEvent) {
880                            ClusterEventType clusterEventType =
881                                    clusterEvent.getClusterEventType();
882    
883                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
884                                    return;
885                            }
886    
887                            List<Address> clusterNodeAddresses =
888                                    ClusterExecutorUtil.getClusterNodeAddresses();
889                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
890    
891                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
892                                    if (_log.isDebugEnabled()) {
893                                            _log.debug(
894                                                    "Number of original cluster members is greater than " +
895                                                            "one");
896                                    }
897    
898                                    return;
899                            }
900    
901                            long[] companyIds = PortalInstances.getCompanyIds();
902    
903                            for (long companyId : companyIds) {
904                                    loadIndexes(companyId);
905                            }
906    
907                            loadIndexes(CompanyConstants.SYSTEM);
908                    }
909    
910                    private void loadIndexes(long companyId) {
911                            long lastGeneration = getLastGeneration(companyId);
912    
913                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
914                                    return;
915                            }
916    
917                            try {
918                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
919                            }
920                            catch (Exception e) {
921                                    _log.error(
922                                            "Unable to load indexes for company " + companyId, e);
923                            }
924                    }
925    
926            }
927    
928            private class LoadIndexClusterResponseCallback
929                    extends BaseClusterResponseCallback {
930    
931                    public LoadIndexClusterResponseCallback(
932                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
933                            long localLastGeneration) {
934    
935                            _indexAccessor = indexAccessor;
936                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
937                            _localLastGeneration = localLastGeneration;
938    
939                            _companyId = _indexAccessor.getCompanyId();
940                    }
941    
942                    @Override
943                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
944                            Address bootupAddress = null;
945    
946                            do {
947                                    _clusterNodeAddressesCount--;
948    
949                                    ClusterNodeResponse clusterNodeResponse = null;
950    
951                                    try {
952                                            clusterNodeResponse = blockingQueue.poll(
953                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
954                                                    TimeUnit.MILLISECONDS);
955                                    }
956                                    catch (Exception e) {
957                                            _log.error("Unable to get cluster node response", e);
958                                    }
959    
960                                    if (clusterNodeResponse == null) {
961                                            if (_log.isDebugEnabled()) {
962                                                    _log.debug(
963                                                            "Unable to get cluster node response in " +
964                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
965                                                                            TimeUnit.MILLISECONDS);
966                                            }
967    
968                                            continue;
969                                    }
970    
971                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
972    
973                                    if (clusterNode.getPort() > 0) {
974                                            try {
975                                                    long remoteLastGeneration =
976                                                            (Long)clusterNodeResponse.getResult();
977    
978                                                    if (remoteLastGeneration > _localLastGeneration) {
979                                                            bootupAddress = clusterNodeResponse.getAddress();
980    
981                                                            break;
982                                                    }
983                                            }
984                                            catch (Exception e) {
985                                                    if (_log.isDebugEnabled()) {
986                                                            _log.debug(
987                                                                    "Suppress exception caused by remote method " +
988                                                                            "invocation",
989                                                                    e);
990                                                    }
991    
992                                                    continue;
993                                            }
994                                    }
995                                    else {
996                                            if (_log.isDebugEnabled()) {
997                                                    _log.debug(
998                                                            "Cluster node " + clusterNode +
999                                                                    " has invalid port");
1000                                            }
1001                                    }
1002                            }
1003                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1004    
1005                            if (bootupAddress == null) {
1006                                    return;
1007                            }
1008    
1009                            if (_log.isInfoEnabled()) {
1010                                    _log.info(
1011                                            "Start loading lucene index files from cluster node " +
1012                                                    bootupAddress);
1013                            }
1014    
1015                            InputStream inputStream = null;
1016    
1017                            try {
1018                                    inputStream = getLoadIndexesInputStreamFromCluster(
1019                                            _companyId, bootupAddress);
1020    
1021                                    _indexAccessor.loadIndex(inputStream);
1022    
1023                                    if (_log.isInfoEnabled()) {
1024                                            _log.info("Lucene index files loaded successfully");
1025                                    }
1026                            }
1027                            catch (Exception e) {
1028                                    _log.error("Unable to load index for company " + _companyId, e);
1029                            }
1030                            finally {
1031                                    if (inputStream != null) {
1032                                            try {
1033                                                    inputStream.close();
1034                                            }
1035                                            catch (IOException ioe) {
1036                                                    _log.error(
1037                                                            "Unable to close input stream for company " +
1038                                                                    _companyId,
1039                                                            ioe);
1040                                            }
1041                                    }
1042                            }
1043                    }
1044    
1045                    @Override
1046                    public void processTimeoutException(TimeoutException timeoutException) {
1047                            _log.error(
1048                                    "Unable to load index for company " + _companyId,
1049                                    timeoutException);
1050                    }
1051    
1052                    private int _clusterNodeAddressesCount;
1053                    private long _companyId;
1054                    private IndexAccessor _indexAccessor;
1055                    private long _localLastGeneration;
1056    
1057            }
1058    
1059    }