001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterLink;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032    import com.liferay.portal.kernel.log.Log;
033    import com.liferay.portal.kernel.log.LogFactoryUtil;
034    import com.liferay.portal.kernel.messaging.Destination;
035    import com.liferay.portal.kernel.messaging.MessageBus;
036    import com.liferay.portal.kernel.messaging.MessageBusUtil;
037    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038    import com.liferay.portal.kernel.search.BooleanClauseOccur;
039    import com.liferay.portal.kernel.search.Field;
040    import com.liferay.portal.kernel.search.SearchEngineUtil;
041    import com.liferay.portal.kernel.util.ArrayUtil;
042    import com.liferay.portal.kernel.util.BasePortalLifecycle;
043    import com.liferay.portal.kernel.util.GetterUtil;
044    import com.liferay.portal.kernel.util.MethodHandler;
045    import com.liferay.portal.kernel.util.MethodKey;
046    import com.liferay.portal.kernel.util.ObjectValuePair;
047    import com.liferay.portal.kernel.util.PortalLifecycle;
048    import com.liferay.portal.kernel.util.PropsKeys;
049    import com.liferay.portal.kernel.util.StringBundler;
050    import com.liferay.portal.kernel.util.StringPool;
051    import com.liferay.portal.kernel.util.StringUtil;
052    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053    import com.liferay.portal.kernel.util.Validator;
054    import com.liferay.portal.model.CompanyConstants;
055    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
056    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
057    import com.liferay.portal.security.auth.TransientTokenUtil;
058    import com.liferay.portal.util.PortalInstances;
059    import com.liferay.portal.util.PortalUtil;
060    import com.liferay.portal.util.PropsUtil;
061    import com.liferay.portal.util.PropsValues;
062    import com.liferay.util.lucene.KeywordsUtil;
063    
064    import java.io.IOException;
065    import java.io.InputStream;
066    import java.io.OutputStream;
067    
068    import java.net.InetAddress;
069    import java.net.URL;
070    import java.net.URLConnection;
071    
072    import java.util.HashSet;
073    import java.util.List;
074    import java.util.Map;
075    import java.util.Set;
076    import java.util.concurrent.BlockingQueue;
077    import java.util.concurrent.ConcurrentHashMap;
078    import java.util.concurrent.CountDownLatch;
079    import java.util.concurrent.TimeUnit;
080    
081    import org.apache.commons.lang.time.StopWatch;
082    import org.apache.lucene.analysis.Analyzer;
083    import org.apache.lucene.analysis.TokenStream;
084    import org.apache.lucene.document.Document;
085    import org.apache.lucene.index.IndexReader;
086    import org.apache.lucene.index.Term;
087    import org.apache.lucene.queryParser.ParseException;
088    import org.apache.lucene.queryParser.QueryParser;
089    import org.apache.lucene.search.BooleanClause;
090    import org.apache.lucene.search.BooleanQuery;
091    import org.apache.lucene.search.IndexSearcher;
092    import org.apache.lucene.search.NumericRangeQuery;
093    import org.apache.lucene.search.Query;
094    import org.apache.lucene.search.TermQuery;
095    import org.apache.lucene.search.TermRangeQuery;
096    import org.apache.lucene.search.WildcardQuery;
097    import org.apache.lucene.search.highlight.Formatter;
098    import org.apache.lucene.search.highlight.Highlighter;
099    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
100    import org.apache.lucene.search.highlight.QueryScorer;
101    import org.apache.lucene.search.highlight.SimpleFragmenter;
102    import org.apache.lucene.search.highlight.WeightedTerm;
103    import org.apache.lucene.util.Version;
104    
105    /**
106     * @author Brian Wing Shun Chan
107     * @author Harry Mark
108     * @author Bruno Farache
109     * @author Shuyang Zhou
110     * @author Tina Tian
111     * @author Hugo Huijser
112     * @author Andrea Di Giorgi
113     */
114    public class LuceneHelperImpl implements LuceneHelper {
115    
116            @Override
117            public void addDocument(long companyId, Document document)
118                    throws IOException {
119    
120                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
121    
122                    indexAccessor.addDocument(document);
123            }
124    
125            @Override
126            public void addExactTerm(
127                    BooleanQuery booleanQuery, String field, String value) {
128    
129                    addTerm(booleanQuery, field, value, false);
130            }
131    
132            @Override
133            public void addNumericRangeTerm(
134                    BooleanQuery booleanQuery, String field, Integer startValue,
135                    Integer endValue) {
136    
137                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
138                            field, startValue, endValue, true, true);
139    
140                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
141            }
142    
143            @Override
144            public void addNumericRangeTerm(
145                    BooleanQuery booleanQuery, String field, Long startValue,
146                    Long endValue) {
147    
148                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
149                            field, startValue, endValue, true, true);
150    
151                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
152            }
153    
154            /**
155             * @deprecated As of 6.2.0, replaced by {@link
156             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
157             */
158            @Override
159            public void addNumericRangeTerm(
160                    BooleanQuery booleanQuery, String field, String startValue,
161                    String endValue) {
162    
163                    addNumericRangeTerm(
164                            booleanQuery, field, GetterUtil.getLong(startValue),
165                            GetterUtil.getLong(endValue));
166            }
167    
168            @Override
169            public void addRangeTerm(
170                    BooleanQuery booleanQuery, String field, String startValue,
171                    String endValue) {
172    
173                    boolean includesLower = true;
174    
175                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
176                            includesLower = false;
177                    }
178    
179                    boolean includesUpper = true;
180    
181                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
182                            includesUpper = false;
183                    }
184    
185                    TermRangeQuery termRangeQuery = new TermRangeQuery(
186                            field, startValue, endValue, includesLower, includesUpper);
187    
188                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
189            }
190    
191            @Override
192            public void addRequiredTerm(
193                    BooleanQuery booleanQuery, String field, String value, boolean like) {
194    
195                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
196            }
197    
198            @Override
199            public void addRequiredTerm(
200                    BooleanQuery booleanQuery, String field, String[] values,
201                    boolean like) {
202    
203                    if (values == null) {
204                            return;
205                    }
206    
207                    BooleanQuery query = new BooleanQuery();
208    
209                    for (String value : values) {
210                            addTerm(query, field, value, like);
211                    }
212    
213                    booleanQuery.add(query, BooleanClause.Occur.MUST);
214            }
215    
216            @Override
217            public void addTerm(
218                    BooleanQuery booleanQuery, String field, String value, boolean like) {
219    
220                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
221            }
222    
223            @Override
224            public void addTerm(
225                    BooleanQuery booleanQuery, String field, String value, boolean like,
226                    BooleanClauseOccur booleanClauseOccur) {
227    
228                    if (Validator.isNull(value)) {
229                            return;
230                    }
231    
232                    Analyzer analyzer = getAnalyzer();
233    
234                    if (!like) {
235                            like = isLikeField(field);
236                    }
237    
238                    if (like) {
239                            value = StringUtil.replace(
240                                    value, StringPool.PERCENT, StringPool.BLANK);
241                    }
242    
243                    try {
244                            QueryParser queryParser = new QueryParser(
245                                    getVersion(), field, analyzer);
246    
247                            value = StringUtil.replace(
248                                    value, _KEYWORDS_LOWERCASE, _KEYWORDS_UPPERCASE);
249    
250                            Query query = parseQuery(queryParser, value);
251    
252                            BooleanClause.Occur occur = null;
253    
254                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
255                                    occur = BooleanClause.Occur.MUST;
256                            }
257                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
258                                    occur = BooleanClause.Occur.MUST_NOT;
259                            }
260                            else {
261                                    occur = BooleanClause.Occur.SHOULD;
262                            }
263    
264                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
265                    }
266                    catch (BooleanQuery.TooManyClauses tmc) {
267                            _log.error(
268                                    "The value in the portal property \"" +
269                                            PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE +
270                                                    "\" is too small",
271                                    tmc);
272                    }
273                    catch (Exception e) {
274                            if (_log.isWarnEnabled()) {
275                                    _log.warn(e, e);
276                            }
277                    }
278            }
279    
280            @Override
281            public void addTerm(
282                    BooleanQuery booleanQuery, String field, String[] values,
283                    boolean like) {
284    
285                    for (String value : values) {
286                            addTerm(booleanQuery, field, value, like);
287                    }
288            }
289    
290            /**
291             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
292             *             IndexSearcher)}
293             */
294            @Deprecated
295            @Override
296            public void cleanUp(IndexSearcher indexSearcher) {
297                    if (indexSearcher == null) {
298                            return;
299                    }
300    
301                    try {
302                            indexSearcher.close();
303    
304                            IndexReader indexReader = indexSearcher.getIndexReader();
305    
306                            if (indexReader != null) {
307                                    indexReader.close();
308                            }
309                    }
310                    catch (IOException ioe) {
311                            _log.error(ioe, ioe);
312                    }
313            }
314    
315            @Override
316            public int countScoredFieldNames(Query query, String[] filedNames) {
317                    int count = 0;
318    
319                    for (String fieldName : filedNames) {
320                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
321                                    query, false, fieldName);
322    
323                            if ((weightedTerms.length > 0) &&
324                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
325    
326                                    count++;
327                            }
328                    }
329    
330                    return count;
331            }
332    
333            @Override
334            public void delete(long companyId) {
335                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
336    
337                    if (indexAccessor == null) {
338                            return;
339                    }
340    
341                    indexAccessor.delete();
342            }
343    
344            @Override
345            public void deleteDocuments(long companyId, Term term) throws IOException {
346                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
347    
348                    if (indexAccessor == null) {
349                            return;
350                    }
351    
352                    indexAccessor.deleteDocuments(term);
353            }
354    
355            @Override
356            public void dumpIndex(long companyId, OutputStream outputStream)
357                    throws IOException {
358    
359                    long lastGeneration = getLastGeneration(companyId);
360    
361                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
362                            if (_log.isDebugEnabled()) {
363                                    _log.debug(
364                                            "Dump index from cluster is not enabled for " + companyId);
365                            }
366    
367                            return;
368                    }
369    
370                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
371    
372                    if (indexAccessor == null) {
373                            return;
374                    }
375    
376                    indexAccessor.dumpIndex(outputStream);
377            }
378    
379            @Override
380            public Analyzer getAnalyzer() {
381                    if (!PropsValues.INDEX_PORTAL_FIELD_ANALYZER_ENABLED) {
382                            return _keywordAnalyzer;
383                    }
384    
385                    return _analyzer;
386            }
387    
388            @Override
389            public IndexAccessor getIndexAccessor(long companyId) {
390                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
391    
392                    if (indexAccessor != null) {
393                            return indexAccessor;
394                    }
395    
396                    synchronized (this) {
397                            indexAccessor = _indexAccessors.get(companyId);
398    
399                            if (indexAccessor == null) {
400                                    indexAccessor = new IndexAccessorImpl(companyId);
401    
402                                    if (isLoadIndexFromClusterEnabled()) {
403                                            indexAccessor = new SynchronizedIndexAccessorImpl(
404                                                    indexAccessor);
405    
406                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
407                                                    MessageValuesThreadLocal.getValue(
408                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
409    
410                                            if (clusterForwardMessage) {
411                                                    if (_log.isInfoEnabled()) {
412                                                            _log.info(
413                                                                    "Skip Luncene index files cluster loading " +
414                                                                            "since this is a manual reindex request");
415                                                    }
416                                            }
417                                            else {
418                                                    try {
419                                                            _loadIndexFromCluster(
420                                                                    indexAccessor,
421                                                                    indexAccessor.getLastGeneration());
422                                                    }
423                                                    catch (Exception e) {
424                                                            _log.error(
425                                                                    "Unable to load index for company " +
426                                                                            indexAccessor.getCompanyId(),
427                                                                    e);
428                                                    }
429                                            }
430                                    }
431    
432                                    _indexAccessors.put(companyId, indexAccessor);
433                            }
434                    }
435    
436                    return indexAccessor;
437            }
438    
439            @Override
440            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
441                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
442    
443                    return indexAccessor.acquireIndexSearcher();
444            }
445    
446            @Override
447            public long getLastGeneration(long companyId) {
448                    if (!isLoadIndexFromClusterEnabled()) {
449                            return IndexAccessor.DEFAULT_LAST_GENERATION;
450                    }
451    
452                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
453    
454                    if (indexAccessor == null) {
455                            return IndexAccessor.DEFAULT_LAST_GENERATION;
456                    }
457    
458                    return indexAccessor.getLastGeneration();
459            }
460    
461            @Override
462            public InputStream getLoadIndexesInputStreamFromCluster(
463                            long companyId, Address bootupAddress)
464                    throws SystemException {
465    
466                    if (!isLoadIndexFromClusterEnabled()) {
467                            return null;
468                    }
469    
470                    InputStream inputStream = null;
471    
472                    try {
473                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
474                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
475    
476                            URL url = bootupClusterNodeObjectValuePair.getValue();
477    
478                            URLConnection urlConnection = url.openConnection();
479    
480                            urlConnection.setDoOutput(true);
481    
482                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
483                                    urlConnection.getOutputStream());
484    
485                            unsyncPrintWriter.write("transientToken=");
486                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
487                            unsyncPrintWriter.write("&companyId=");
488                            unsyncPrintWriter.write(String.valueOf(companyId));
489    
490                            unsyncPrintWriter.close();
491    
492                            inputStream = urlConnection.getInputStream();
493    
494                            return inputStream;
495                    }
496                    catch (IOException ioe) {
497                            throw new SystemException(ioe);
498                    }
499            }
500    
501            @Override
502            public Set<String> getQueryTerms(Query query) {
503                    String queryString = StringUtil.replace(
504                            query.toString(), StringPool.STAR, StringPool.BLANK);
505    
506                    Query tempQuery = null;
507    
508                    try {
509                            QueryParser queryParser = new QueryParser(
510                                    getVersion(), StringPool.BLANK, getAnalyzer());
511    
512                            tempQuery = parseQuery(queryParser, queryString);
513                    }
514                    catch (Exception e) {
515                            if (_log.isWarnEnabled()) {
516                                    _log.warn("Unable to parse " + queryString);
517                            }
518    
519                            tempQuery = query;
520                    }
521    
522                    WeightedTerm[] weightedTerms = null;
523    
524                    for (String fieldName : Field.KEYWORDS) {
525                            weightedTerms = QueryTermExtractor.getTerms(
526                                    tempQuery, false, fieldName);
527    
528                            if (weightedTerms.length > 0) {
529                                    break;
530                            }
531                    }
532    
533                    Set<String> queryTerms = new HashSet<String>();
534    
535                    for (WeightedTerm weightedTerm : weightedTerms) {
536                            queryTerms.add(weightedTerm.getTerm());
537                    }
538    
539                    return queryTerms;
540            }
541    
542            /**
543             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
544             */
545            @Deprecated
546            @Override
547            public IndexSearcher getSearcher(long companyId, boolean readOnly)
548                    throws IOException {
549    
550                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
551    
552                    IndexReader indexReader = IndexReader.open(
553                            indexAccessor.getLuceneDir(), readOnly);
554    
555                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
556    
557                    indexSearcher.setDefaultFieldSortScoring(true, false);
558                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
559    
560                    return indexSearcher;
561            }
562    
563            @Override
564            public String getSnippet(
565                            Query query, String field, String s, int maxNumFragments,
566                            int fragmentLength, String fragmentSuffix, Formatter formatter)
567                    throws IOException {
568    
569                    QueryScorer queryScorer = new QueryScorer(query, field);
570    
571                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
572    
573                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
574    
575                    TokenStream tokenStream = getAnalyzer().tokenStream(
576                            field, new UnsyncStringReader(s));
577    
578                    try {
579                            String snippet = highlighter.getBestFragments(
580                                    tokenStream, s, maxNumFragments, fragmentSuffix);
581    
582                            if (Validator.isNotNull(snippet) &&
583                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
584                                    !s.equals(snippet)) {
585    
586                                    snippet = snippet.concat(fragmentSuffix);
587                            }
588    
589                            return snippet;
590                    }
591                    catch (InvalidTokenOffsetsException itoe) {
592                            throw new IOException(itoe.getMessage());
593                    }
594            }
595    
596            @Override
597            public Version getVersion() {
598                    return _version;
599            }
600    
601            @Override
602            public boolean isLoadIndexFromClusterEnabled() {
603                    if (PropsValues.CLUSTER_LINK_ENABLED &&
604                            PropsValues.LUCENE_REPLICATE_WRITE) {
605    
606                            return true;
607                    }
608    
609                    if (_log.isDebugEnabled()) {
610                            _log.debug("Load index from cluster is not enabled");
611                    }
612    
613                    return false;
614            }
615    
616            public boolean isLikeField(String field) {
617                    if (_analyzer instanceof PerFieldAnalyzer) {
618                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)_analyzer;
619    
620                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
621    
622                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
623                                    return true;
624                            }
625                    }
626    
627                    return false;
628            }
629    
630            @Override
631            public void loadIndex(long companyId, InputStream inputStream)
632                    throws IOException {
633    
634                    if (!isLoadIndexFromClusterEnabled()) {
635                            return;
636                    }
637    
638                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
639    
640                    if (indexAccessor == null) {
641                            if (_log.isInfoEnabled()) {
642                                    _log.info(
643                                            "Skip loading Lucene index files for company " + companyId +
644                                                    " in favor of lazy loading");
645                            }
646    
647                            return;
648                    }
649    
650                    StopWatch stopWatch = new StopWatch();
651    
652                    stopWatch.start();
653    
654                    if (_log.isInfoEnabled()) {
655                            _log.info(
656                                    "Start loading Lucene index files for company " + companyId);
657                    }
658    
659                    indexAccessor.loadIndex(inputStream);
660    
661                    if (_log.isInfoEnabled()) {
662                            _log.info(
663                                    "Finished loading index files for company " + companyId +
664                                            " in " + stopWatch.getTime() + " ms");
665                    }
666            }
667    
668            @Override
669            public void loadIndexesFromCluster(long companyId) throws SystemException {
670                    if (!isLoadIndexFromClusterEnabled()) {
671                            return;
672                    }
673    
674                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
675    
676                    if (indexAccessor == null) {
677                            return;
678                    }
679    
680                    long localLastGeneration = getLastGeneration(companyId);
681    
682                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
683            }
684    
685            @Override
686            public void releaseIndexSearcher(
687                            long companyId, IndexSearcher indexSearcher)
688                    throws IOException {
689    
690                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
691    
692                    indexAccessor.releaseIndexSearcher(indexSearcher);
693            }
694    
695            public void setAnalyzer(Analyzer analyzer) {
696                    _analyzer = analyzer;
697            }
698    
699            public void setKeywordAnalyzer(Analyzer keywordAnalyzer) {
700                    _keywordAnalyzer = keywordAnalyzer;
701            }
702    
703            public void setVersion(Version version) {
704                    _version = version;
705            }
706    
707            @Override
708            public void shutdown() {
709                    if (_luceneIndexThreadPoolExecutor != null) {
710                            _luceneIndexThreadPoolExecutor.shutdownNow();
711    
712                            try {
713                                    _luceneIndexThreadPoolExecutor.awaitTermination(
714                                            60, TimeUnit.SECONDS);
715                            }
716                            catch (InterruptedException ie) {
717                                    _log.error("Lucene indexer shutdown interrupted", ie);
718                            }
719                    }
720    
721                    if (isLoadIndexFromClusterEnabled()) {
722                            ClusterExecutorUtil.removeClusterEventListener(
723                                    _loadIndexClusterEventListener);
724                    }
725    
726                    MessageBus messageBus = MessageBusUtil.getMessageBus();
727    
728                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
729                            String searchWriterDestinationName =
730                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
731    
732                            Destination searchWriteDestination = messageBus.getDestination(
733                                    searchWriterDestinationName);
734    
735                            if (searchWriteDestination != null) {
736                                    ThreadPoolExecutor threadPoolExecutor =
737                                            PortalExecutorManagerUtil.getPortalExecutor(
738                                                    searchWriterDestinationName);
739    
740                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
741    
742                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
743    
744                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
745                                            countDownLatch);
746    
747                                    for (int i = 0; i < maxPoolSize; i++) {
748                                            threadPoolExecutor.submit(shutdownSyncJob);
749                                    }
750    
751                                    try {
752                                            countDownLatch.await();
753                                    }
754                                    catch (InterruptedException ie) {
755                                            _log.error("Shutdown waiting interrupted", ie);
756                                    }
757    
758                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
759    
760                                    if (_log.isDebugEnabled()) {
761                                            _log.debug(
762                                                    "Cancelled appending indexing jobs: " + runnables);
763                                    }
764    
765                                    searchWriteDestination.close(true);
766                            }
767                    }
768    
769                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
770                            indexAccessor.close();
771                    }
772            }
773    
774            @Override
775            public void shutdown(long companyId) {
776                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
777    
778                    _indexAccessors.remove(indexAccessor);
779    
780                    indexAccessor.close();
781            }
782    
783            @Override
784            public void startup(long companyId) {
785                    if (!PropsValues.INDEX_ON_STARTUP) {
786                            if (PropsValues.LUCENE_CLUSTER_INDEX_LOADING_ON_STARTUP) {
787                                    getIndexAccessor(companyId);
788                            }
789    
790                            return;
791                    }
792    
793                    if (_log.isInfoEnabled()) {
794                            _log.info("Indexing Lucene on startup");
795                    }
796    
797                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
798    
799                    if (PropsValues.INDEX_WITH_THREAD) {
800                            if (_luceneIndexThreadPoolExecutor == null) {
801    
802                                    // This should never be null except for the case where
803                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
804                                    // PropsValues#INDEX_ON_STARTUP to true.
805    
806                                    _luceneIndexThreadPoolExecutor =
807                                            PortalExecutorManagerUtil.getPortalExecutor(
808                                                    LuceneHelperImpl.class.getName());
809                            }
810    
811                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
812                    }
813                    else {
814                            luceneIndexer.reindex();
815                    }
816            }
817    
818            @Override
819            public void updateDocument(long companyId, Term term, Document document)
820                    throws IOException {
821    
822                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
823    
824                    indexAccessor.updateDocument(term, document);
825            }
826    
827            protected Query parseQuery(QueryParser queryParser, String queryString)
828                    throws ParseException {
829    
830                    try {
831                            return queryParser.parse(queryString);
832                    }
833                    catch (ParseException e) {
834                            return queryParser.parse(KeywordsUtil.escape(queryString));
835                    }
836            }
837    
838            private LuceneHelperImpl() {
839                    if ((PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) ||
840                            isLoadIndexFromClusterEnabled()) {
841    
842                            _luceneIndexThreadPoolExecutor =
843                                    PortalExecutorManagerUtil.getPortalExecutor(
844                                            LuceneHelperImpl.class.getName());
845                    }
846    
847                    if (isLoadIndexFromClusterEnabled()) {
848                            _loadIndexClusterEventListener =
849                                    new LoadIndexClusterEventListener();
850    
851                            ClusterExecutorUtil.addClusterEventListener(
852                                    _loadIndexClusterEventListener);
853    
854                            CompanyInitalizedListener companyInitalizedListener =
855                                    new CompanyInitalizedListener();
856    
857                            companyInitalizedListener.registerPortalLifecycle(
858                                    PortalLifecycle.METHOD_INIT);
859                    }
860    
861                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
862            }
863    
864            private ObjectValuePair<String, URL>
865                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
866                    throws SystemException {
867    
868                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
869                            new MethodHandler(
870                                    _createTokenMethodKey,
871                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
872                            bootupAddress);
873    
874                    FutureClusterResponses futureClusterResponses =
875                            ClusterExecutorUtil.execute(clusterRequest);
876    
877                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
878                            futureClusterResponses.getPartialResults();
879    
880                    try {
881                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
882                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
883                                    TimeUnit.MILLISECONDS);
884    
885                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
886    
887                            int port = clusterNode.getPort();
888    
889                            if (port <= 0) {
890                                    StringBundler sb = new StringBundler(6);
891    
892                                    sb.append("Invalid cluster node port ");
893                                    sb.append(port);
894                                    sb.append(". The port is set by the first request or ");
895                                    sb.append("configured in portal.properties by the properties ");
896                                    sb.append("\"portal.instance.http.port\" and ");
897                                    sb.append("\"portal.instance.https.port\".");
898    
899                                    throw new Exception(sb.toString());
900                            }
901    
902                            String protocol = clusterNode.getPortalProtocol();
903    
904                            if (Validator.isNull(protocol)) {
905                                    StringBundler sb = new StringBundler(4);
906    
907                                    sb.append("Cluster node protocol is empty. The protocol is ");
908                                    sb.append("set by the first request or configured in ");
909                                    sb.append("portal.properties by the property ");
910                                    sb.append("\"portal.instance.protocol\"");
911    
912                                    throw new Exception(sb.toString());
913                            }
914    
915                            InetAddress inetAddress = clusterNode.getInetAddress();
916    
917                            String hostName = null;
918    
919                            if (PropsValues.LUCENE_CLUSTER_INDEX_USE_CANONICAL_HOST_NAME) {
920                                    hostName = inetAddress.getCanonicalHostName();
921                            }
922                            else {
923                                    hostName = inetAddress.getHostAddress();
924                            }
925    
926                            String fileName = PortalUtil.getPathContext();
927    
928                            if (!fileName.endsWith(StringPool.SLASH)) {
929                                    fileName = fileName.concat(StringPool.SLASH);
930                            }
931    
932                            fileName = fileName.concat("lucene/dump");
933    
934                            URL url = new URL(protocol, hostName, port, fileName);
935    
936                            String transientToken = (String)clusterNodeResponse.getResult();
937    
938                            return new ObjectValuePair<String, URL>(transientToken, url);
939                    }
940                    catch (Exception e) {
941                            throw new SystemException(e);
942                    }
943            }
944    
945            private void _handleFutureClusterResponses(
946                    FutureClusterResponses futureClusterResponses,
947                    IndexAccessor indexAccessor, int clusterNodeAddressesCount,
948                    long localLastGeneration) {
949    
950                    BlockingQueue<ClusterNodeResponse> blockingQueue =
951                            futureClusterResponses.getPartialResults();
952    
953                    long companyId = indexAccessor.getCompanyId();
954    
955                    Address bootupAddress = null;
956    
957                    do {
958                            clusterNodeAddressesCount--;
959    
960                            ClusterNodeResponse clusterNodeResponse = null;
961    
962                            try {
963                                    clusterNodeResponse = blockingQueue.poll(
964                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
965                                            TimeUnit.MILLISECONDS);
966                            }
967                            catch (Exception e) {
968                                    _log.error("Unable to get cluster node response", e);
969                            }
970    
971                            if (clusterNodeResponse == null) {
972                                    if (_log.isDebugEnabled()) {
973                                            _log.debug(
974                                                    "Unable to get cluster node response in " +
975                                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
976                                                                    TimeUnit.MILLISECONDS);
977                                    }
978    
979                                    continue;
980                            }
981    
982                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
983    
984                            if (clusterNode.getPort() > 0) {
985                                    try {
986                                            long remoteLastGeneration =
987                                                    (Long)clusterNodeResponse.getResult();
988    
989                                            if (remoteLastGeneration > localLastGeneration) {
990                                                    bootupAddress = clusterNodeResponse.getAddress();
991    
992                                                    break;
993                                            }
994                                    }
995                                    catch (Exception e) {
996                                            if (_log.isDebugEnabled()) {
997                                                    _log.debug(
998                                                            "Suppress exception caused by remote method " +
999                                                                    "invocation",
1000                                                            e);
1001                                            }
1002    
1003                                            continue;
1004                                    }
1005                            }
1006                            else {
1007                                    if (_log.isDebugEnabled()) {
1008                                            _log.debug(
1009                                                    "Cluster node " + clusterNode +
1010                                                            " has invalid port");
1011                                    }
1012                            }
1013                    }
1014                    while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
1015    
1016                    if (bootupAddress == null) {
1017                            return;
1018                    }
1019    
1020                    if (_log.isInfoEnabled()) {
1021                            _log.info(
1022                                    "Start loading lucene index files from cluster node " +
1023                                            bootupAddress);
1024                    }
1025    
1026                    InputStream inputStream = null;
1027    
1028                    try {
1029                            inputStream = getLoadIndexesInputStreamFromCluster(
1030                                    companyId, bootupAddress);
1031    
1032                            indexAccessor.loadIndex(inputStream);
1033    
1034                            if (_log.isInfoEnabled()) {
1035                                    _log.info("Lucene index files loaded successfully");
1036                            }
1037                    }
1038                    catch (Exception e) {
1039                            _log.error("Unable to load index for company " + companyId, e);
1040                    }
1041                    finally {
1042                            if (inputStream != null) {
1043                                    try {
1044                                            inputStream.close();
1045                                    }
1046                                    catch (IOException ioe) {
1047                                            _log.error(
1048                                                    "Unable to close input stream for company " +
1049                                                            companyId,
1050                                                    ioe);
1051                                    }
1052                            }
1053                    }
1054            }
1055    
1056            private void _includeIfUnique(
1057                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
1058                    Query query, BooleanClause.Occur occur) {
1059    
1060                    if (query instanceof TermQuery) {
1061                            Set<Term> terms = new HashSet<Term>();
1062    
1063                            TermQuery termQuery = (TermQuery)query;
1064    
1065                            termQuery.extractTerms(terms);
1066    
1067                            float boost = termQuery.getBoost();
1068    
1069                            for (Term term : terms) {
1070                                    String termValue = term.text();
1071    
1072                                    if (like) {
1073                                            termValue = termValue.toLowerCase(queryParser.getLocale());
1074    
1075                                            termValue = termValue.concat(StringPool.STAR);
1076    
1077                                            if (PropsValues.INDEX_SEARCH_LEADING_WILDCARD_ENABLED) {
1078                                                    termValue = StringPool.STAR.concat(termValue);
1079                                            }
1080    
1081                                            term = term.createTerm(termValue);
1082    
1083                                            query = new WildcardQuery(term);
1084                                    }
1085                                    else {
1086                                            query = new TermQuery(term);
1087                                    }
1088    
1089                                    query.setBoost(boost);
1090    
1091                                    boolean included = false;
1092    
1093                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1094                                            if (query.equals(booleanClause.getQuery())) {
1095                                                    included = true;
1096                                            }
1097                                    }
1098    
1099                                    if (!included) {
1100                                            booleanQuery.add(query, occur);
1101                                    }
1102                            }
1103                    }
1104                    else if (query instanceof BooleanQuery) {
1105                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
1106    
1107                            BooleanQuery containerBooleanQuery = new BooleanQuery();
1108    
1109                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1110                                    _includeIfUnique(
1111                                            containerBooleanQuery, like, queryParser,
1112                                            booleanClause.getQuery(), booleanClause.getOccur());
1113                            }
1114    
1115                            if (containerBooleanQuery.getClauses().length > 0) {
1116                                    booleanQuery.add(containerBooleanQuery, occur);
1117                            }
1118                    }
1119                    else {
1120                            boolean included = false;
1121    
1122                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1123                                    if (query.equals(booleanClause.getQuery())) {
1124                                            included = true;
1125                                    }
1126                            }
1127    
1128                            if (!included) {
1129                                    booleanQuery.add(query, occur);
1130                            }
1131                    }
1132            }
1133    
1134            private void _loadIndexFromCluster(
1135                            IndexAccessor indexAccessor, long localLastGeneration)
1136                    throws SystemException {
1137    
1138                    List<Address> clusterNodeAddresses =
1139                            ClusterExecutorUtil.getClusterNodeAddresses();
1140    
1141                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
1142    
1143                    if (clusterNodeAddressesCount <= 1) {
1144                            if (_log.isDebugEnabled()) {
1145                                    _log.debug(
1146                                            "Do not load indexes because there is either one portal " +
1147                                                    "instance or no portal instances in the cluster");
1148                            }
1149    
1150                            return;
1151                    }
1152    
1153                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1154                            new MethodHandler(
1155                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1156                            true);
1157    
1158                    FutureClusterResponses futureClusterResponses =
1159                            ClusterExecutorUtil.execute(clusterRequest);
1160    
1161                    _handleFutureClusterResponses(
1162                            futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1163                            localLastGeneration);
1164            }
1165    
1166            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1167                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1168    
1169            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1170                    GetterUtil.getInteger(
1171                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1172                            BooleanQuery.getMaxClauseCount());
1173    
1174            private static final String[] _KEYWORDS_LOWERCASE = {
1175                    " and ", " not ", " or "
1176            };
1177    
1178            private static final String[] _KEYWORDS_UPPERCASE = {
1179                    " AND ", " NOT ", " OR "
1180            };
1181    
1182            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1183    
1184            private static MethodKey _createTokenMethodKey = new MethodKey(
1185                    TransientTokenUtil.class, "createToken", long.class);
1186            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1187                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1188    
1189            private final CountDownLatch _countDownLatch = new CountDownLatch(1);
1190    
1191            private Analyzer _analyzer;
1192            private Map<Long, IndexAccessor> _indexAccessors =
1193                    new ConcurrentHashMap<Long, IndexAccessor>();
1194            private Analyzer _keywordAnalyzer;
1195            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1196            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1197            private Version _version;
1198    
1199            private static class ShutdownSyncJob implements Runnable {
1200    
1201                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1202                            _countDownLatch = countDownLatch;
1203                    }
1204    
1205                    @Override
1206                    public void run() {
1207                            _countDownLatch.countDown();
1208    
1209                            try {
1210                                    synchronized (this) {
1211                                            wait();
1212                                    }
1213                            }
1214                            catch (InterruptedException ie) {
1215                            }
1216                    }
1217    
1218                    private final CountDownLatch _countDownLatch;
1219    
1220            }
1221    
1222            private class ClusterIndexLoader implements Runnable {
1223    
1224                    @Override
1225                    public void run() {
1226                            long lastGeneration = getLastGeneration(_companyId);
1227    
1228                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1229                                    return;
1230                            }
1231    
1232                            try {
1233                                    LuceneClusterUtil.loadIndexesFromCluster(_companyId);
1234                            }
1235                            catch (Exception e) {
1236                                    _log.error(
1237                                            "Unable to load indexes for company " + _companyId, e);
1238                            }
1239                    }
1240    
1241                    private ClusterIndexLoader(long companyId) {
1242                            _companyId = companyId;
1243                    }
1244    
1245                    private final long _companyId;
1246    
1247            }
1248    
1249            private class CompanyInitalizedListener extends BasePortalLifecycle {
1250    
1251                    @Override
1252                    protected void doPortalDestroy() throws Exception {
1253                    }
1254    
1255                    @Override
1256                    protected void doPortalInit() throws Exception {
1257                            _countDownLatch.countDown();
1258                    }
1259    
1260            }
1261    
1262            private class LoadIndexClusterEventListener
1263                    implements ClusterEventListener {
1264    
1265                    @Override
1266                    public void processClusterEvent(ClusterEvent clusterEvent) {
1267                            ClusterEventType clusterEventType =
1268                                    clusterEvent.getClusterEventType();
1269    
1270                            if (clusterEventType != ClusterEventType.JOIN) {
1271                                    return;
1272                            }
1273    
1274                            List<Address> clusterNodeAddresses =
1275                                    ClusterExecutorUtil.getClusterNodeAddresses();
1276                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1277    
1278                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1279                                    if (_log.isDebugEnabled()) {
1280                                            _log.debug(
1281                                                    "Number of original cluster members is greater than " +
1282                                                            "one");
1283                                    }
1284    
1285                                    return;
1286                            }
1287    
1288                            try {
1289                                    _countDownLatch.await();
1290                            }
1291                            catch (InterruptedException ie) {
1292                                    _log.error("Latch opened prematurely by interruption");
1293                            }
1294    
1295                            long[] companyIds = PortalInstances.getCompanyIds();
1296    
1297                            for (long companyId : companyIds) {
1298                                    _luceneIndexThreadPoolExecutor.execute(
1299                                            new ClusterIndexLoader(companyId));
1300                            }
1301    
1302                            _luceneIndexThreadPoolExecutor.execute(
1303                                    new ClusterIndexLoader(CompanyConstants.SYSTEM));
1304                    }
1305    
1306            }
1307    
1308    }