001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterLink;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032    import com.liferay.portal.kernel.log.Log;
033    import com.liferay.portal.kernel.log.LogFactoryUtil;
034    import com.liferay.portal.kernel.messaging.Destination;
035    import com.liferay.portal.kernel.messaging.MessageBus;
036    import com.liferay.portal.kernel.messaging.MessageBusUtil;
037    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038    import com.liferay.portal.kernel.search.BooleanClauseOccur;
039    import com.liferay.portal.kernel.search.Field;
040    import com.liferay.portal.kernel.search.SearchEngineUtil;
041    import com.liferay.portal.kernel.util.ArrayUtil;
042    import com.liferay.portal.kernel.util.BasePortalLifecycle;
043    import com.liferay.portal.kernel.util.GetterUtil;
044    import com.liferay.portal.kernel.util.MethodHandler;
045    import com.liferay.portal.kernel.util.MethodKey;
046    import com.liferay.portal.kernel.util.ObjectValuePair;
047    import com.liferay.portal.kernel.util.PortalLifecycle;
048    import com.liferay.portal.kernel.util.PropsKeys;
049    import com.liferay.portal.kernel.util.StringBundler;
050    import com.liferay.portal.kernel.util.StringPool;
051    import com.liferay.portal.kernel.util.StringUtil;
052    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053    import com.liferay.portal.kernel.util.Validator;
054    import com.liferay.portal.model.CompanyConstants;
055    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
056    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
057    import com.liferay.portal.security.auth.TransientTokenUtil;
058    import com.liferay.portal.util.PortalInstances;
059    import com.liferay.portal.util.PortalUtil;
060    import com.liferay.portal.util.PropsUtil;
061    import com.liferay.portal.util.PropsValues;
062    import com.liferay.util.lucene.KeywordsUtil;
063    
064    import java.io.IOException;
065    import java.io.InputStream;
066    import java.io.OutputStream;
067    
068    import java.net.InetAddress;
069    import java.net.URL;
070    import java.net.URLConnection;
071    
072    import java.util.HashSet;
073    import java.util.List;
074    import java.util.Map;
075    import java.util.Set;
076    import java.util.concurrent.BlockingQueue;
077    import java.util.concurrent.ConcurrentHashMap;
078    import java.util.concurrent.CountDownLatch;
079    import java.util.concurrent.TimeUnit;
080    
081    import org.apache.commons.lang.time.StopWatch;
082    import org.apache.lucene.analysis.Analyzer;
083    import org.apache.lucene.analysis.TokenStream;
084    import org.apache.lucene.document.Document;
085    import org.apache.lucene.index.IndexReader;
086    import org.apache.lucene.index.Term;
087    import org.apache.lucene.queryParser.ParseException;
088    import org.apache.lucene.queryParser.QueryParser;
089    import org.apache.lucene.search.BooleanClause;
090    import org.apache.lucene.search.BooleanQuery;
091    import org.apache.lucene.search.IndexSearcher;
092    import org.apache.lucene.search.NumericRangeQuery;
093    import org.apache.lucene.search.Query;
094    import org.apache.lucene.search.TermQuery;
095    import org.apache.lucene.search.TermRangeQuery;
096    import org.apache.lucene.search.WildcardQuery;
097    import org.apache.lucene.search.highlight.Formatter;
098    import org.apache.lucene.search.highlight.Highlighter;
099    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
100    import org.apache.lucene.search.highlight.QueryScorer;
101    import org.apache.lucene.search.highlight.SimpleFragmenter;
102    import org.apache.lucene.search.highlight.WeightedTerm;
103    import org.apache.lucene.util.Version;
104    
105    /**
106     * @author Brian Wing Shun Chan
107     * @author Harry Mark
108     * @author Bruno Farache
109     * @author Shuyang Zhou
110     * @author Tina Tian
111     * @author Hugo Huijser
112     * @author Andrea Di Giorgi
113     */
114    public class LuceneHelperImpl implements LuceneHelper {
115    
116            @Override
117            public void addDocument(long companyId, Document document)
118                    throws IOException {
119    
120                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
121    
122                    indexAccessor.addDocument(document);
123            }
124    
125            @Override
126            public void addExactTerm(
127                    BooleanQuery booleanQuery, String field, String value) {
128    
129                    addTerm(booleanQuery, field, value, false);
130            }
131    
132            @Override
133            public void addNumericRangeTerm(
134                    BooleanQuery booleanQuery, String field, Integer startValue,
135                    Integer endValue) {
136    
137                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
138                            field, startValue, endValue, true, true);
139    
140                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
141            }
142    
143            @Override
144            public void addNumericRangeTerm(
145                    BooleanQuery booleanQuery, String field, Long startValue,
146                    Long endValue) {
147    
148                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
149                            field, startValue, endValue, true, true);
150    
151                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
152            }
153    
154            /**
155             * @deprecated As of 6.2.0, replaced by {@link
156             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
157             */
158            @Override
159            public void addNumericRangeTerm(
160                    BooleanQuery booleanQuery, String field, String startValue,
161                    String endValue) {
162    
163                    addNumericRangeTerm(
164                            booleanQuery, field, GetterUtil.getLong(startValue),
165                            GetterUtil.getLong(endValue));
166            }
167    
168            @Override
169            public void addRangeTerm(
170                    BooleanQuery booleanQuery, String field, String startValue,
171                    String endValue) {
172    
173                    boolean includesLower = true;
174    
175                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
176                            includesLower = false;
177                    }
178    
179                    boolean includesUpper = true;
180    
181                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
182                            includesUpper = false;
183                    }
184    
185                    TermRangeQuery termRangeQuery = new TermRangeQuery(
186                            field, startValue, endValue, includesLower, includesUpper);
187    
188                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
189            }
190    
191            @Override
192            public void addRequiredTerm(
193                    BooleanQuery booleanQuery, String field, String value, boolean like) {
194    
195                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
196            }
197    
198            @Override
199            public void addRequiredTerm(
200                    BooleanQuery booleanQuery, String field, String[] values,
201                    boolean like) {
202    
203                    if (values == null) {
204                            return;
205                    }
206    
207                    BooleanQuery query = new BooleanQuery();
208    
209                    for (String value : values) {
210                            addTerm(query, field, value, like);
211                    }
212    
213                    booleanQuery.add(query, BooleanClause.Occur.MUST);
214            }
215    
216            @Override
217            public void addTerm(
218                    BooleanQuery booleanQuery, String field, String value, boolean like) {
219    
220                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
221            }
222    
223            @Override
224            public void addTerm(
225                    BooleanQuery booleanQuery, String field, String value, boolean like,
226                    BooleanClauseOccur booleanClauseOccur) {
227    
228                    if (Validator.isNull(value)) {
229                            return;
230                    }
231    
232                    Analyzer analyzer = getAnalyzer();
233    
234                    if (analyzer instanceof PerFieldAnalyzer) {
235                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
236    
237                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
238    
239                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
240                                    like = true;
241                            }
242                    }
243    
244                    if (like) {
245                            value = StringUtil.replace(
246                                    value, StringPool.PERCENT, StringPool.BLANK);
247                    }
248    
249                    try {
250                            QueryParser queryParser = new QueryParser(
251                                    getVersion(), field, analyzer);
252    
253                            value = StringUtil.replace(
254                                    value, _KEYWORDS_LOWERCASE, _KEYWORDS_UPPERCASE);
255    
256                            Query query = parseQuery(queryParser, value);
257    
258                            BooleanClause.Occur occur = null;
259    
260                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
261                                    occur = BooleanClause.Occur.MUST;
262                            }
263                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
264                                    occur = BooleanClause.Occur.MUST_NOT;
265                            }
266                            else {
267                                    occur = BooleanClause.Occur.SHOULD;
268                            }
269    
270                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
271                    }
272                    catch (BooleanQuery.TooManyClauses tmc) {
273                            _log.error(
274                                    "The value in the portal property \"" +
275                                            PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE +
276                                                    "\" is too small",
277                                    tmc);
278                    }
279                    catch (Exception e) {
280                            if (_log.isWarnEnabled()) {
281                                    _log.warn(e, e);
282                            }
283                    }
284            }
285    
286            @Override
287            public void addTerm(
288                    BooleanQuery booleanQuery, String field, String[] values,
289                    boolean like) {
290    
291                    for (String value : values) {
292                            addTerm(booleanQuery, field, value, like);
293                    }
294            }
295    
296            /**
297             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
298             *             IndexSearcher)}
299             */
300            @Deprecated
301            @Override
302            public void cleanUp(IndexSearcher indexSearcher) {
303                    if (indexSearcher == null) {
304                            return;
305                    }
306    
307                    try {
308                            indexSearcher.close();
309    
310                            IndexReader indexReader = indexSearcher.getIndexReader();
311    
312                            if (indexReader != null) {
313                                    indexReader.close();
314                            }
315                    }
316                    catch (IOException ioe) {
317                            _log.error(ioe, ioe);
318                    }
319            }
320    
321            @Override
322            public int countScoredFieldNames(Query query, String[] filedNames) {
323                    int count = 0;
324    
325                    for (String fieldName : filedNames) {
326                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
327                                    query, false, fieldName);
328    
329                            if ((weightedTerms.length > 0) &&
330                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
331    
332                                    count++;
333                            }
334                    }
335    
336                    return count;
337            }
338    
339            @Override
340            public void delete(long companyId) {
341                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
342    
343                    if (indexAccessor == null) {
344                            return;
345                    }
346    
347                    indexAccessor.delete();
348            }
349    
350            @Override
351            public void deleteDocuments(long companyId, Term term) throws IOException {
352                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
353    
354                    if (indexAccessor == null) {
355                            return;
356                    }
357    
358                    indexAccessor.deleteDocuments(term);
359            }
360    
361            @Override
362            public void dumpIndex(long companyId, OutputStream outputStream)
363                    throws IOException {
364    
365                    long lastGeneration = getLastGeneration(companyId);
366    
367                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
368                            if (_log.isDebugEnabled()) {
369                                    _log.debug(
370                                            "Dump index from cluster is not enabled for " + companyId);
371                            }
372    
373                            return;
374                    }
375    
376                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
377    
378                    if (indexAccessor == null) {
379                            return;
380                    }
381    
382                    indexAccessor.dumpIndex(outputStream);
383            }
384    
385            @Override
386            public Analyzer getAnalyzer() {
387                    return _analyzer;
388            }
389    
390            @Override
391            public IndexAccessor getIndexAccessor(long companyId) {
392                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
393    
394                    if (indexAccessor != null) {
395                            return indexAccessor;
396                    }
397    
398                    synchronized (this) {
399                            indexAccessor = _indexAccessors.get(companyId);
400    
401                            if (indexAccessor == null) {
402                                    indexAccessor = new IndexAccessorImpl(companyId);
403    
404                                    if (isLoadIndexFromClusterEnabled()) {
405                                            indexAccessor = new SynchronizedIndexAccessorImpl(
406                                                    indexAccessor);
407    
408                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
409                                                    MessageValuesThreadLocal.getValue(
410                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
411    
412                                            if (clusterForwardMessage) {
413                                                    if (_log.isInfoEnabled()) {
414                                                            _log.info(
415                                                                    "Skip Luncene index files cluster loading " +
416                                                                            "since this is a manual reindex request");
417                                                    }
418                                            }
419                                            else {
420                                                    try {
421                                                            _loadIndexFromCluster(
422                                                                    indexAccessor,
423                                                                    indexAccessor.getLastGeneration());
424                                                    }
425                                                    catch (Exception e) {
426                                                            _log.error(
427                                                                    "Unable to load index for company " +
428                                                                            indexAccessor.getCompanyId(),
429                                                                    e);
430                                                    }
431                                            }
432                                    }
433    
434                                    _indexAccessors.put(companyId, indexAccessor);
435                            }
436                    }
437    
438                    return indexAccessor;
439            }
440    
441            @Override
442            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
443                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
444    
445                    return indexAccessor.acquireIndexSearcher();
446            }
447    
448            @Override
449            public long getLastGeneration(long companyId) {
450                    if (!isLoadIndexFromClusterEnabled()) {
451                            return IndexAccessor.DEFAULT_LAST_GENERATION;
452                    }
453    
454                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
455    
456                    if (indexAccessor == null) {
457                            return IndexAccessor.DEFAULT_LAST_GENERATION;
458                    }
459    
460                    return indexAccessor.getLastGeneration();
461            }
462    
463            @Override
464            public InputStream getLoadIndexesInputStreamFromCluster(
465                            long companyId, Address bootupAddress)
466                    throws SystemException {
467    
468                    if (!isLoadIndexFromClusterEnabled()) {
469                            return null;
470                    }
471    
472                    InputStream inputStream = null;
473    
474                    try {
475                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
476                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
477    
478                            URL url = bootupClusterNodeObjectValuePair.getValue();
479    
480                            URLConnection urlConnection = url.openConnection();
481    
482                            urlConnection.setDoOutput(true);
483    
484                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
485                                    urlConnection.getOutputStream());
486    
487                            unsyncPrintWriter.write("transientToken=");
488                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
489                            unsyncPrintWriter.write("&companyId=");
490                            unsyncPrintWriter.write(String.valueOf(companyId));
491    
492                            unsyncPrintWriter.close();
493    
494                            inputStream = urlConnection.getInputStream();
495    
496                            return inputStream;
497                    }
498                    catch (IOException ioe) {
499                            throw new SystemException(ioe);
500                    }
501            }
502    
503            @Override
504            public Set<String> getQueryTerms(Query query) {
505                    String queryString = StringUtil.replace(
506                            query.toString(), StringPool.STAR, StringPool.BLANK);
507    
508                    Query tempQuery = null;
509    
510                    try {
511                            QueryParser queryParser = new QueryParser(
512                                    getVersion(), StringPool.BLANK, getAnalyzer());
513    
514                            tempQuery = parseQuery(queryParser, queryString);
515                    }
516                    catch (Exception e) {
517                            if (_log.isWarnEnabled()) {
518                                    _log.warn("Unable to parse " + queryString);
519                            }
520    
521                            tempQuery = query;
522                    }
523    
524                    WeightedTerm[] weightedTerms = null;
525    
526                    for (String fieldName : Field.KEYWORDS) {
527                            weightedTerms = QueryTermExtractor.getTerms(
528                                    tempQuery, false, fieldName);
529    
530                            if (weightedTerms.length > 0) {
531                                    break;
532                            }
533                    }
534    
535                    Set<String> queryTerms = new HashSet<String>();
536    
537                    for (WeightedTerm weightedTerm : weightedTerms) {
538                            queryTerms.add(weightedTerm.getTerm());
539                    }
540    
541                    return queryTerms;
542            }
543    
544            /**
545             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
546             */
547            @Deprecated
548            @Override
549            public IndexSearcher getSearcher(long companyId, boolean readOnly)
550                    throws IOException {
551    
552                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
553    
554                    IndexReader indexReader = IndexReader.open(
555                            indexAccessor.getLuceneDir(), readOnly);
556    
557                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
558    
559                    indexSearcher.setDefaultFieldSortScoring(true, false);
560                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
561    
562                    return indexSearcher;
563            }
564    
565            @Override
566            public String getSnippet(
567                            Query query, String field, String s, int maxNumFragments,
568                            int fragmentLength, String fragmentSuffix, Formatter formatter)
569                    throws IOException {
570    
571                    QueryScorer queryScorer = new QueryScorer(query, field);
572    
573                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
574    
575                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
576    
577                    TokenStream tokenStream = getAnalyzer().tokenStream(
578                            field, new UnsyncStringReader(s));
579    
580                    try {
581                            String snippet = highlighter.getBestFragments(
582                                    tokenStream, s, maxNumFragments, fragmentSuffix);
583    
584                            if (Validator.isNotNull(snippet) &&
585                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
586                                    !s.equals(snippet)) {
587    
588                                    snippet = snippet.concat(fragmentSuffix);
589                            }
590    
591                            return snippet;
592                    }
593                    catch (InvalidTokenOffsetsException itoe) {
594                            throw new IOException(itoe.getMessage());
595                    }
596            }
597    
598            @Override
599            public Version getVersion() {
600                    return _version;
601            }
602    
603            @Override
604            public boolean isLoadIndexFromClusterEnabled() {
605                    if (PropsValues.CLUSTER_LINK_ENABLED &&
606                            PropsValues.LUCENE_REPLICATE_WRITE) {
607    
608                            return true;
609                    }
610    
611                    if (_log.isDebugEnabled()) {
612                            _log.debug("Load index from cluster is not enabled");
613                    }
614    
615                    return false;
616            }
617    
618            @Override
619            public void loadIndex(long companyId, InputStream inputStream)
620                    throws IOException {
621    
622                    if (!isLoadIndexFromClusterEnabled()) {
623                            return;
624                    }
625    
626                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
627    
628                    if (indexAccessor == null) {
629                            if (_log.isInfoEnabled()) {
630                                    _log.info(
631                                            "Skip loading Lucene index files for company " + companyId +
632                                                    " in favor of lazy loading");
633                            }
634    
635                            return;
636                    }
637    
638                    StopWatch stopWatch = new StopWatch();
639    
640                    stopWatch.start();
641    
642                    if (_log.isInfoEnabled()) {
643                            _log.info(
644                                    "Start loading Lucene index files for company " + companyId);
645                    }
646    
647                    indexAccessor.loadIndex(inputStream);
648    
649                    if (_log.isInfoEnabled()) {
650                            _log.info(
651                                    "Finished loading index files for company " + companyId +
652                                            " in " + stopWatch.getTime() + " ms");
653                    }
654            }
655    
656            @Override
657            public void loadIndexesFromCluster(long companyId) throws SystemException {
658                    if (!isLoadIndexFromClusterEnabled()) {
659                            return;
660                    }
661    
662                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
663    
664                    if (indexAccessor == null) {
665                            return;
666                    }
667    
668                    long localLastGeneration = getLastGeneration(companyId);
669    
670                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
671            }
672    
673            @Override
674            public void releaseIndexSearcher(
675                            long companyId, IndexSearcher indexSearcher)
676                    throws IOException {
677    
678                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
679    
680                    indexAccessor.releaseIndexSearcher(indexSearcher);
681            }
682    
683            public void setAnalyzer(Analyzer analyzer) {
684                    _analyzer = analyzer;
685            }
686    
687            public void setVersion(Version version) {
688                    _version = version;
689            }
690    
691            @Override
692            public void shutdown() {
693                    if (_luceneIndexThreadPoolExecutor != null) {
694                            _luceneIndexThreadPoolExecutor.shutdownNow();
695    
696                            try {
697                                    _luceneIndexThreadPoolExecutor.awaitTermination(
698                                            60, TimeUnit.SECONDS);
699                            }
700                            catch (InterruptedException ie) {
701                                    _log.error("Lucene indexer shutdown interrupted", ie);
702                            }
703                    }
704    
705                    if (isLoadIndexFromClusterEnabled()) {
706                            ClusterExecutorUtil.removeClusterEventListener(
707                                    _loadIndexClusterEventListener);
708                    }
709    
710                    MessageBus messageBus = MessageBusUtil.getMessageBus();
711    
712                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
713                            String searchWriterDestinationName =
714                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
715    
716                            Destination searchWriteDestination = messageBus.getDestination(
717                                    searchWriterDestinationName);
718    
719                            if (searchWriteDestination != null) {
720                                    ThreadPoolExecutor threadPoolExecutor =
721                                            PortalExecutorManagerUtil.getPortalExecutor(
722                                                    searchWriterDestinationName);
723    
724                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
725    
726                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
727    
728                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
729                                            countDownLatch);
730    
731                                    for (int i = 0; i < maxPoolSize; i++) {
732                                            threadPoolExecutor.submit(shutdownSyncJob);
733                                    }
734    
735                                    try {
736                                            countDownLatch.await();
737                                    }
738                                    catch (InterruptedException ie) {
739                                            _log.error("Shutdown waiting interrupted", ie);
740                                    }
741    
742                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
743    
744                                    if (_log.isDebugEnabled()) {
745                                            _log.debug(
746                                                    "Cancelled appending indexing jobs: " + runnables);
747                                    }
748    
749                                    searchWriteDestination.close(true);
750                            }
751                    }
752    
753                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
754                            indexAccessor.close();
755                    }
756            }
757    
758            @Override
759            public void shutdown(long companyId) {
760                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
761    
762                    _indexAccessors.remove(indexAccessor);
763    
764                    indexAccessor.close();
765            }
766    
767            @Override
768            public void startup(long companyId) {
769                    if (!PropsValues.INDEX_ON_STARTUP) {
770                            if (PropsValues.LUCENE_CLUSTER_INDEX_LOADING_ON_STARTUP) {
771                                    getIndexAccessor(companyId);
772                            }
773    
774                            return;
775                    }
776    
777                    if (_log.isInfoEnabled()) {
778                            _log.info("Indexing Lucene on startup");
779                    }
780    
781                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
782    
783                    if (PropsValues.INDEX_WITH_THREAD) {
784                            if (_luceneIndexThreadPoolExecutor == null) {
785    
786                                    // This should never be null except for the case where
787                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
788                                    // PropsValues#INDEX_ON_STARTUP to true.
789    
790                                    _luceneIndexThreadPoolExecutor =
791                                            PortalExecutorManagerUtil.getPortalExecutor(
792                                                    LuceneHelperImpl.class.getName());
793                            }
794    
795                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
796                    }
797                    else {
798                            luceneIndexer.reindex();
799                    }
800            }
801    
802            @Override
803            public void updateDocument(long companyId, Term term, Document document)
804                    throws IOException {
805    
806                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
807    
808                    indexAccessor.updateDocument(term, document);
809            }
810    
811            protected Query parseQuery(QueryParser queryParser, String queryString)
812                    throws ParseException {
813    
814                    try {
815                            return queryParser.parse(queryString);
816                    }
817                    catch (ParseException e) {
818                            return queryParser.parse(KeywordsUtil.escape(queryString));
819                    }
820            }
821    
822            private LuceneHelperImpl() {
823                    if ((PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) ||
824                            isLoadIndexFromClusterEnabled()) {
825    
826                            _luceneIndexThreadPoolExecutor =
827                                    PortalExecutorManagerUtil.getPortalExecutor(
828                                            LuceneHelperImpl.class.getName());
829                    }
830    
831                    if (isLoadIndexFromClusterEnabled()) {
832                            _loadIndexClusterEventListener =
833                                    new LoadIndexClusterEventListener();
834    
835                            ClusterExecutorUtil.addClusterEventListener(
836                                    _loadIndexClusterEventListener);
837    
838                            CompanyInitalizedListener companyInitalizedListener =
839                                    new CompanyInitalizedListener();
840    
841                            companyInitalizedListener.registerPortalLifecycle(
842                                    PortalLifecycle.METHOD_INIT);
843                    }
844    
845                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
846            }
847    
848            private ObjectValuePair<String, URL>
849                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
850                    throws SystemException {
851    
852                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
853                            new MethodHandler(
854                                    _createTokenMethodKey,
855                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
856                            bootupAddress);
857    
858                    FutureClusterResponses futureClusterResponses =
859                            ClusterExecutorUtil.execute(clusterRequest);
860    
861                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
862                            futureClusterResponses.getPartialResults();
863    
864                    try {
865                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
866                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
867                                    TimeUnit.MILLISECONDS);
868    
869                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
870    
871                            int port = clusterNode.getPort();
872    
873                            if (port <= 0) {
874                                    StringBundler sb = new StringBundler(6);
875    
876                                    sb.append("Invalid cluster node port ");
877                                    sb.append(port);
878                                    sb.append(". The port is set by the first request or ");
879                                    sb.append("configured in portal.properties by the properties ");
880                                    sb.append("\"portal.instance.http.port\" and ");
881                                    sb.append("\"portal.instance.https.port\".");
882    
883                                    throw new Exception(sb.toString());
884                            }
885    
886                            String protocol = clusterNode.getPortalProtocol();
887    
888                            if (Validator.isNull(protocol)) {
889                                    StringBundler sb = new StringBundler(4);
890    
891                                    sb.append("Cluster node protocol is empty. The protocol is ");
892                                    sb.append("set by the first request or configured in ");
893                                    sb.append("portal.properties by the property ");
894                                    sb.append("\"portal.instance.protocol\"");
895    
896                                    throw new Exception(sb.toString());
897                            }
898    
899                            InetAddress inetAddress = clusterNode.getInetAddress();
900    
901                            String hostName = null;
902    
903                            if (PropsValues.LUCENE_CLUSTER_INDEX_USE_CANONICAL_HOST_NAME) {
904                                    hostName = inetAddress.getCanonicalHostName();
905                            }
906                            else {
907                                    hostName = inetAddress.getHostAddress();
908                            }
909    
910                            String fileName = PortalUtil.getPathContext();
911    
912                            if (!fileName.endsWith(StringPool.SLASH)) {
913                                    fileName = fileName.concat(StringPool.SLASH);
914                            }
915    
916                            fileName = fileName.concat("lucene/dump");
917    
918                            URL url = new URL(protocol, hostName, port, fileName);
919    
920                            String transientToken = (String)clusterNodeResponse.getResult();
921    
922                            return new ObjectValuePair<String, URL>(transientToken, url);
923                    }
924                    catch (Exception e) {
925                            throw new SystemException(e);
926                    }
927            }
928    
929            private void _handleFutureClusterResponses(
930                    FutureClusterResponses futureClusterResponses,
931                    IndexAccessor indexAccessor, int clusterNodeAddressesCount,
932                    long localLastGeneration) {
933    
934                    BlockingQueue<ClusterNodeResponse> blockingQueue =
935                            futureClusterResponses.getPartialResults();
936    
937                    long companyId = indexAccessor.getCompanyId();
938    
939                    Address bootupAddress = null;
940    
941                    do {
942                            clusterNodeAddressesCount--;
943    
944                            ClusterNodeResponse clusterNodeResponse = null;
945    
946                            try {
947                                    clusterNodeResponse = blockingQueue.poll(
948                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
949                                            TimeUnit.MILLISECONDS);
950                            }
951                            catch (Exception e) {
952                                    _log.error("Unable to get cluster node response", e);
953                            }
954    
955                            if (clusterNodeResponse == null) {
956                                    if (_log.isDebugEnabled()) {
957                                            _log.debug(
958                                                    "Unable to get cluster node response in " +
959                                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
960                                                                    TimeUnit.MILLISECONDS);
961                                    }
962    
963                                    continue;
964                            }
965    
966                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
967    
968                            if (clusterNode.getPort() > 0) {
969                                    try {
970                                            long remoteLastGeneration =
971                                                    (Long)clusterNodeResponse.getResult();
972    
973                                            if (remoteLastGeneration > localLastGeneration) {
974                                                    bootupAddress = clusterNodeResponse.getAddress();
975    
976                                                    break;
977                                            }
978                                    }
979                                    catch (Exception e) {
980                                            if (_log.isDebugEnabled()) {
981                                                    _log.debug(
982                                                            "Suppress exception caused by remote method " +
983                                                                    "invocation",
984                                                            e);
985                                            }
986    
987                                            continue;
988                                    }
989                            }
990                            else {
991                                    if (_log.isDebugEnabled()) {
992                                            _log.debug(
993                                                    "Cluster node " + clusterNode +
994                                                            " has invalid port");
995                                    }
996                            }
997                    }
998                    while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
999    
1000                    if (bootupAddress == null) {
1001                            return;
1002                    }
1003    
1004                    if (_log.isInfoEnabled()) {
1005                            _log.info(
1006                                    "Start loading lucene index files from cluster node " +
1007                                            bootupAddress);
1008                    }
1009    
1010                    InputStream inputStream = null;
1011    
1012                    try {
1013                            inputStream = getLoadIndexesInputStreamFromCluster(
1014                                    companyId, bootupAddress);
1015    
1016                            indexAccessor.loadIndex(inputStream);
1017    
1018                            if (_log.isInfoEnabled()) {
1019                                    _log.info("Lucene index files loaded successfully");
1020                            }
1021                    }
1022                    catch (Exception e) {
1023                            _log.error("Unable to load index for company " + companyId, e);
1024                    }
1025                    finally {
1026                            if (inputStream != null) {
1027                                    try {
1028                                            inputStream.close();
1029                                    }
1030                                    catch (IOException ioe) {
1031                                            _log.error(
1032                                                    "Unable to close input stream for company " +
1033                                                            companyId,
1034                                                    ioe);
1035                                    }
1036                            }
1037                    }
1038            }
1039    
1040            private void _includeIfUnique(
1041                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
1042                    Query query, BooleanClause.Occur occur) {
1043    
1044                    if (query instanceof TermQuery) {
1045                            Set<Term> terms = new HashSet<Term>();
1046    
1047                            TermQuery termQuery = (TermQuery)query;
1048    
1049                            termQuery.extractTerms(terms);
1050    
1051                            float boost = termQuery.getBoost();
1052    
1053                            for (Term term : terms) {
1054                                    String termValue = term.text();
1055    
1056                                    if (like) {
1057                                            termValue = termValue.toLowerCase(queryParser.getLocale());
1058    
1059                                            termValue = termValue.concat(StringPool.STAR);
1060    
1061                                            if (PropsValues.INDEX_SEARCH_LEADING_WILDCARD_ENABLED) {
1062                                                    termValue = StringPool.STAR.concat(termValue);
1063                                            }
1064    
1065                                            term = term.createTerm(termValue);
1066    
1067                                            query = new WildcardQuery(term);
1068                                    }
1069                                    else {
1070                                            query = new TermQuery(term);
1071                                    }
1072    
1073                                    query.setBoost(boost);
1074    
1075                                    boolean included = false;
1076    
1077                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1078                                            if (query.equals(booleanClause.getQuery())) {
1079                                                    included = true;
1080                                            }
1081                                    }
1082    
1083                                    if (!included) {
1084                                            booleanQuery.add(query, occur);
1085                                    }
1086                            }
1087                    }
1088                    else if (query instanceof BooleanQuery) {
1089                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
1090    
1091                            BooleanQuery containerBooleanQuery = new BooleanQuery();
1092    
1093                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1094                                    _includeIfUnique(
1095                                            containerBooleanQuery, like, queryParser,
1096                                            booleanClause.getQuery(), booleanClause.getOccur());
1097                            }
1098    
1099                            if (containerBooleanQuery.getClauses().length > 0) {
1100                                    booleanQuery.add(containerBooleanQuery, occur);
1101                            }
1102                    }
1103                    else {
1104                            boolean included = false;
1105    
1106                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1107                                    if (query.equals(booleanClause.getQuery())) {
1108                                            included = true;
1109                                    }
1110                            }
1111    
1112                            if (!included) {
1113                                    booleanQuery.add(query, occur);
1114                            }
1115                    }
1116            }
1117    
1118            private void _loadIndexFromCluster(
1119                            IndexAccessor indexAccessor, long localLastGeneration)
1120                    throws SystemException {
1121    
1122                    List<Address> clusterNodeAddresses =
1123                            ClusterExecutorUtil.getClusterNodeAddresses();
1124    
1125                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
1126    
1127                    if (clusterNodeAddressesCount <= 1) {
1128                            if (_log.isDebugEnabled()) {
1129                                    _log.debug(
1130                                            "Do not load indexes because there is either one portal " +
1131                                                    "instance or no portal instances in the cluster");
1132                            }
1133    
1134                            return;
1135                    }
1136    
1137                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1138                            new MethodHandler(
1139                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1140                            true);
1141    
1142                    FutureClusterResponses futureClusterResponses =
1143                            ClusterExecutorUtil.execute(clusterRequest);
1144    
1145                    _handleFutureClusterResponses(
1146                            futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1147                            localLastGeneration);
1148            }
1149    
1150            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1151                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1152    
1153            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1154                    GetterUtil.getInteger(
1155                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1156                            BooleanQuery.getMaxClauseCount());
1157    
1158            private static final String[] _KEYWORDS_LOWERCASE = {
1159                    " and ", " not ", " or "
1160            };
1161    
1162            private static final String[] _KEYWORDS_UPPERCASE = {
1163                    " AND ", " NOT ", " OR "
1164            };
1165    
1166            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1167    
1168            private static MethodKey _createTokenMethodKey = new MethodKey(
1169                    TransientTokenUtil.class, "createToken", long.class);
1170            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1171                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1172    
1173            private final CountDownLatch _countDownLatch = new CountDownLatch(1);
1174    
1175            private Analyzer _analyzer;
1176            private Map<Long, IndexAccessor> _indexAccessors =
1177                    new ConcurrentHashMap<Long, IndexAccessor>();
1178            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1179            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1180            private Version _version;
1181    
1182            private static class ShutdownSyncJob implements Runnable {
1183    
1184                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1185                            _countDownLatch = countDownLatch;
1186                    }
1187    
1188                    @Override
1189                    public void run() {
1190                            _countDownLatch.countDown();
1191    
1192                            try {
1193                                    synchronized (this) {
1194                                            wait();
1195                                    }
1196                            }
1197                            catch (InterruptedException ie) {
1198                            }
1199                    }
1200    
1201                    private final CountDownLatch _countDownLatch;
1202    
1203            }
1204    
1205            private class ClusterIndexLoader implements Runnable {
1206    
1207                    @Override
1208                    public void run() {
1209                            long lastGeneration = getLastGeneration(_companyId);
1210    
1211                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1212                                    return;
1213                            }
1214    
1215                            try {
1216                                    LuceneClusterUtil.loadIndexesFromCluster(_companyId);
1217                            }
1218                            catch (Exception e) {
1219                                    _log.error(
1220                                            "Unable to load indexes for company " + _companyId, e);
1221                            }
1222                    }
1223    
1224                    private ClusterIndexLoader(long companyId) {
1225                            _companyId = companyId;
1226                    }
1227    
1228                    private final long _companyId;
1229    
1230            }
1231    
1232            private class CompanyInitalizedListener extends BasePortalLifecycle {
1233    
1234                    @Override
1235                    protected void doPortalDestroy() throws Exception {
1236                    }
1237    
1238                    @Override
1239                    protected void doPortalInit() throws Exception {
1240                            _countDownLatch.countDown();
1241                    }
1242    
1243            }
1244    
1245            private class LoadIndexClusterEventListener
1246                    implements ClusterEventListener {
1247    
1248                    @Override
1249                    public void processClusterEvent(ClusterEvent clusterEvent) {
1250                            ClusterEventType clusterEventType =
1251                                    clusterEvent.getClusterEventType();
1252    
1253                            if (clusterEventType != ClusterEventType.JOIN) {
1254                                    return;
1255                            }
1256    
1257                            List<Address> clusterNodeAddresses =
1258                                    ClusterExecutorUtil.getClusterNodeAddresses();
1259                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1260    
1261                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1262                                    if (_log.isDebugEnabled()) {
1263                                            _log.debug(
1264                                                    "Number of original cluster members is greater than " +
1265                                                            "one");
1266                                    }
1267    
1268                                    return;
1269                            }
1270    
1271                            try {
1272                                    _countDownLatch.await();
1273                            }
1274                            catch (InterruptedException ie) {
1275                                    _log.error("Latch opened prematurely by interruption");
1276                            }
1277    
1278                            long[] companyIds = PortalInstances.getCompanyIds();
1279    
1280                            for (long companyId : companyIds) {
1281                                    _luceneIndexThreadPoolExecutor.execute(
1282                                            new ClusterIndexLoader(companyId));
1283                            }
1284    
1285                            _luceneIndexThreadPoolExecutor.execute(
1286                                    new ClusterIndexLoader(CompanyConstants.SYSTEM));
1287                    }
1288    
1289            }
1290    
1291    }