001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLink;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.Destination;
036    import com.liferay.portal.kernel.messaging.MessageBus;
037    import com.liferay.portal.kernel.messaging.MessageBusUtil;
038    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039    import com.liferay.portal.kernel.search.BooleanClauseOccur;
040    import com.liferay.portal.kernel.search.Field;
041    import com.liferay.portal.kernel.search.SearchEngineUtil;
042    import com.liferay.portal.kernel.util.ArrayUtil;
043    import com.liferay.portal.kernel.util.GetterUtil;
044    import com.liferay.portal.kernel.util.MethodHandler;
045    import com.liferay.portal.kernel.util.MethodKey;
046    import com.liferay.portal.kernel.util.ObjectValuePair;
047    import com.liferay.portal.kernel.util.PropsKeys;
048    import com.liferay.portal.kernel.util.StringBundler;
049    import com.liferay.portal.kernel.util.StringPool;
050    import com.liferay.portal.kernel.util.StringUtil;
051    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
052    import com.liferay.portal.kernel.util.Validator;
053    import com.liferay.portal.model.CompanyConstants;
054    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
055    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
056    import com.liferay.portal.security.auth.TransientTokenUtil;
057    import com.liferay.portal.util.PortalInstances;
058    import com.liferay.portal.util.PortalUtil;
059    import com.liferay.portal.util.PropsUtil;
060    import com.liferay.portal.util.PropsValues;
061    
062    import java.io.IOException;
063    import java.io.InputStream;
064    import java.io.OutputStream;
065    
066    import java.net.InetAddress;
067    import java.net.URL;
068    import java.net.URLConnection;
069    
070    import java.util.HashSet;
071    import java.util.List;
072    import java.util.Map;
073    import java.util.Set;
074    import java.util.concurrent.BlockingQueue;
075    import java.util.concurrent.ConcurrentHashMap;
076    import java.util.concurrent.CountDownLatch;
077    import java.util.concurrent.TimeUnit;
078    import java.util.concurrent.TimeoutException;
079    
080    import org.apache.commons.lang.time.StopWatch;
081    import org.apache.lucene.analysis.Analyzer;
082    import org.apache.lucene.analysis.TokenStream;
083    import org.apache.lucene.document.Document;
084    import org.apache.lucene.index.IndexReader;
085    import org.apache.lucene.index.Term;
086    import org.apache.lucene.queryParser.QueryParser;
087    import org.apache.lucene.search.BooleanClause;
088    import org.apache.lucene.search.BooleanQuery;
089    import org.apache.lucene.search.IndexSearcher;
090    import org.apache.lucene.search.NumericRangeQuery;
091    import org.apache.lucene.search.Query;
092    import org.apache.lucene.search.TermQuery;
093    import org.apache.lucene.search.TermRangeQuery;
094    import org.apache.lucene.search.WildcardQuery;
095    import org.apache.lucene.search.highlight.Formatter;
096    import org.apache.lucene.search.highlight.Highlighter;
097    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
098    import org.apache.lucene.search.highlight.QueryScorer;
099    import org.apache.lucene.search.highlight.SimpleFragmenter;
100    import org.apache.lucene.search.highlight.WeightedTerm;
101    import org.apache.lucene.util.Version;
102    
103    /**
104     * @author Brian Wing Shun Chan
105     * @author Harry Mark
106     * @author Bruno Farache
107     * @author Shuyang Zhou
108     * @author Tina Tian
109     * @author Hugo Huijser
110     * @author Andrea Di Giorgi
111     */
112    public class LuceneHelperImpl implements LuceneHelper {
113    
114            @Override
115            public void addDocument(long companyId, Document document)
116                    throws IOException {
117    
118                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
119    
120                    indexAccessor.addDocument(document);
121            }
122    
123            @Override
124            public void addExactTerm(
125                    BooleanQuery booleanQuery, String field, String value) {
126    
127                    addTerm(booleanQuery, field, value, false);
128            }
129    
130            @Override
131            public void addNumericRangeTerm(
132                    BooleanQuery booleanQuery, String field, Integer startValue,
133                    Integer endValue) {
134    
135                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
136                            field, startValue, endValue, true, true);
137    
138                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
139            }
140    
141            @Override
142            public void addNumericRangeTerm(
143                    BooleanQuery booleanQuery, String field, Long startValue,
144                    Long endValue) {
145    
146                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
147                            field, startValue, endValue, true, true);
148    
149                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
150            }
151    
152            /**
153             * @deprecated As of 6.2.0, replaced by {@link
154             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
155             */
156            @Override
157            public void addNumericRangeTerm(
158                    BooleanQuery booleanQuery, String field, String startValue,
159                    String endValue) {
160    
161                    addNumericRangeTerm(
162                            booleanQuery, field, GetterUtil.getLong(startValue),
163                            GetterUtil.getLong(endValue));
164            }
165    
166            @Override
167            public void addRangeTerm(
168                    BooleanQuery booleanQuery, String field, String startValue,
169                    String endValue) {
170    
171                    boolean includesLower = true;
172    
173                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
174                            includesLower = false;
175                    }
176    
177                    boolean includesUpper = true;
178    
179                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
180                            includesUpper = false;
181                    }
182    
183                    TermRangeQuery termRangeQuery = new TermRangeQuery(
184                            field, startValue, endValue, includesLower, includesUpper);
185    
186                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
187            }
188    
189            @Override
190            public void addRequiredTerm(
191                    BooleanQuery booleanQuery, String field, String value, boolean like) {
192    
193                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
194            }
195    
196            @Override
197            public void addRequiredTerm(
198                    BooleanQuery booleanQuery, String field, String[] values,
199                    boolean like) {
200    
201                    if (values == null) {
202                            return;
203                    }
204    
205                    BooleanQuery query = new BooleanQuery();
206    
207                    for (String value : values) {
208                            addTerm(query, field, value, like);
209                    }
210    
211                    booleanQuery.add(query, BooleanClause.Occur.MUST);
212            }
213    
214            @Override
215            public void addTerm(
216                    BooleanQuery booleanQuery, String field, String value, boolean like) {
217    
218                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
219            }
220    
221            @Override
222            public void addTerm(
223                    BooleanQuery booleanQuery, String field, String value, boolean like,
224                    BooleanClauseOccur booleanClauseOccur) {
225    
226                    if (Validator.isNull(value)) {
227                            return;
228                    }
229    
230                    Analyzer analyzer = getAnalyzer();
231    
232                    if (analyzer instanceof PerFieldAnalyzer) {
233                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
234    
235                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
236    
237                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
238                                    like = true;
239                            }
240                    }
241    
242                    if (like) {
243                            value = StringUtil.replace(
244                                    value, StringPool.PERCENT, StringPool.BLANK);
245                    }
246    
247                    try {
248                            QueryParser queryParser = new QueryParser(
249                                    getVersion(), field, analyzer);
250    
251                            Query query = queryParser.parse(value);
252    
253                            BooleanClause.Occur occur = null;
254    
255                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
256                                    occur = BooleanClause.Occur.MUST;
257                            }
258                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
259                                    occur = BooleanClause.Occur.MUST_NOT;
260                            }
261                            else {
262                                    occur = BooleanClause.Occur.SHOULD;
263                            }
264    
265                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
266                    }
267                    catch (Exception e) {
268                            if (_log.isWarnEnabled()) {
269                                    _log.warn(e, e);
270                            }
271                    }
272            }
273    
274            @Override
275            public void addTerm(
276                    BooleanQuery booleanQuery, String field, String[] values,
277                    boolean like) {
278    
279                    for (String value : values) {
280                            addTerm(booleanQuery, field, value, like);
281                    }
282            }
283    
284            /**
285             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
286             *             IndexSearcher)}
287             */
288            @Deprecated
289            @Override
290            public void cleanUp(IndexSearcher indexSearcher) {
291                    if (indexSearcher == null) {
292                            return;
293                    }
294    
295                    try {
296                            indexSearcher.close();
297    
298                            IndexReader indexReader = indexSearcher.getIndexReader();
299    
300                            if (indexReader != null) {
301                                    indexReader.close();
302                            }
303                    }
304                    catch (IOException ioe) {
305                            _log.error(ioe, ioe);
306                    }
307            }
308    
309            @Override
310            public int countScoredFieldNames(Query query, String[] filedNames) {
311                    int count = 0;
312    
313                    for (String fieldName : filedNames) {
314                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
315                                    query, false, fieldName);
316    
317                            if ((weightedTerms.length > 0) &&
318                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
319    
320                                    count++;
321                            }
322                    }
323    
324                    return count;
325            }
326    
327            @Override
328            public void delete(long companyId) {
329                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
330    
331                    if (indexAccessor == null) {
332                            return;
333                    }
334    
335                    indexAccessor.delete();
336            }
337    
338            @Override
339            public void deleteDocuments(long companyId, Term term) throws IOException {
340                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
341    
342                    if (indexAccessor == null) {
343                            return;
344                    }
345    
346                    indexAccessor.deleteDocuments(term);
347            }
348    
349            @Override
350            public void dumpIndex(long companyId, OutputStream outputStream)
351                    throws IOException {
352    
353                    long lastGeneration = getLastGeneration(companyId);
354    
355                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
356                            if (_log.isDebugEnabled()) {
357                                    _log.debug(
358                                            "Dump index from cluster is not enabled for " + companyId);
359                            }
360    
361                            return;
362                    }
363    
364                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
365    
366                    if (indexAccessor == null) {
367                            return;
368                    }
369    
370                    indexAccessor.dumpIndex(outputStream);
371            }
372    
373            @Override
374            public Analyzer getAnalyzer() {
375                    return _analyzer;
376            }
377    
378            @Override
379            public IndexAccessor getIndexAccessor(long companyId) {
380                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
381    
382                    if (indexAccessor != null) {
383                            return indexAccessor;
384                    }
385    
386                    synchronized (this) {
387                            indexAccessor = _indexAccessors.get(companyId);
388    
389                            if (indexAccessor == null) {
390                                    indexAccessor = new IndexAccessorImpl(companyId);
391    
392                                    if (isLoadIndexFromClusterEnabled()) {
393                                            indexAccessor = new SynchronizedIndexAccessorImpl(
394                                                    indexAccessor);
395    
396                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
397                                                    MessageValuesThreadLocal.getValue(
398                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
399    
400                                            if (clusterForwardMessage) {
401                                                    if (_log.isInfoEnabled()) {
402                                                            _log.info(
403                                                                    "Skip Luncene index files cluster loading " +
404                                                                            "since this is a manual reindex request");
405                                                    }
406                                            }
407                                            else {
408                                                    try {
409                                                            _loadIndexFromCluster(
410                                                                    indexAccessor,
411                                                                    indexAccessor.getLastGeneration());
412                                                    }
413                                                    catch (Exception e) {
414                                                            _log.error(
415                                                                    "Unable to load index for company " +
416                                                                            indexAccessor.getCompanyId(),
417                                                                    e);
418                                                    }
419                                            }
420                                    }
421    
422                                    _indexAccessors.put(companyId, indexAccessor);
423                            }
424                    }
425    
426                    return indexAccessor;
427            }
428    
429            @Override
430            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
431                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
432    
433                    return indexAccessor.acquireIndexSearcher();
434            }
435    
436            @Override
437            public long getLastGeneration(long companyId) {
438                    if (!isLoadIndexFromClusterEnabled()) {
439                            return IndexAccessor.DEFAULT_LAST_GENERATION;
440                    }
441    
442                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
443    
444                    if (indexAccessor == null) {
445                            return IndexAccessor.DEFAULT_LAST_GENERATION;
446                    }
447    
448                    return indexAccessor.getLastGeneration();
449            }
450    
451            @Override
452            public InputStream getLoadIndexesInputStreamFromCluster(
453                            long companyId, Address bootupAddress)
454                    throws SystemException {
455    
456                    if (!isLoadIndexFromClusterEnabled()) {
457                            return null;
458                    }
459    
460                    InputStream inputStream = null;
461    
462                    try {
463                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
464                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
465    
466                            URL url = bootupClusterNodeObjectValuePair.getValue();
467    
468                            URLConnection urlConnection = url.openConnection();
469    
470                            urlConnection.setDoOutput(true);
471    
472                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
473                                    urlConnection.getOutputStream());
474    
475                            unsyncPrintWriter.write("transientToken=");
476                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
477                            unsyncPrintWriter.write("&companyId=");
478                            unsyncPrintWriter.write(String.valueOf(companyId));
479    
480                            unsyncPrintWriter.close();
481    
482                            inputStream = urlConnection.getInputStream();
483    
484                            return inputStream;
485                    }
486                    catch (IOException ioe) {
487                            throw new SystemException(ioe);
488                    }
489            }
490    
491            @Override
492            public Set<String> getQueryTerms(Query query) {
493                    String queryString = StringUtil.replace(
494                            query.toString(), StringPool.STAR, StringPool.BLANK);
495    
496                    Query tempQuery = null;
497    
498                    try {
499                            QueryParser queryParser = new QueryParser(
500                                    getVersion(), StringPool.BLANK, getAnalyzer());
501    
502                            tempQuery = queryParser.parse(queryString);
503                    }
504                    catch (Exception e) {
505                            if (_log.isWarnEnabled()) {
506                                    _log.warn("Unable to parse " + queryString);
507                            }
508    
509                            tempQuery = query;
510                    }
511    
512                    WeightedTerm[] weightedTerms = null;
513    
514                    for (String fieldName : Field.KEYWORDS) {
515                            weightedTerms = QueryTermExtractor.getTerms(
516                                    tempQuery, false, fieldName);
517    
518                            if (weightedTerms.length > 0) {
519                                    break;
520                            }
521                    }
522    
523                    Set<String> queryTerms = new HashSet<String>();
524    
525                    for (WeightedTerm weightedTerm : weightedTerms) {
526                            queryTerms.add(weightedTerm.getTerm());
527                    }
528    
529                    return queryTerms;
530            }
531    
532            /**
533             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
534             */
535            @Deprecated
536            @Override
537            public IndexSearcher getSearcher(long companyId, boolean readOnly)
538                    throws IOException {
539    
540                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
541    
542                    IndexReader indexReader = IndexReader.open(
543                            indexAccessor.getLuceneDir(), readOnly);
544    
545                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
546    
547                    indexSearcher.setDefaultFieldSortScoring(true, false);
548                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
549    
550                    return indexSearcher;
551            }
552    
553            @Override
554            public String getSnippet(
555                            Query query, String field, String s, int maxNumFragments,
556                            int fragmentLength, String fragmentSuffix, Formatter formatter)
557                    throws IOException {
558    
559                    QueryScorer queryScorer = new QueryScorer(query, field);
560    
561                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
562    
563                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
564    
565                    TokenStream tokenStream = getAnalyzer().tokenStream(
566                            field, new UnsyncStringReader(s));
567    
568                    try {
569                            String snippet = highlighter.getBestFragments(
570                                    tokenStream, s, maxNumFragments, fragmentSuffix);
571    
572                            if (Validator.isNotNull(snippet) &&
573                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
574                                    !s.equals(snippet)) {
575    
576                                    snippet = snippet.concat(fragmentSuffix);
577                            }
578    
579                            return snippet;
580                    }
581                    catch (InvalidTokenOffsetsException itoe) {
582                            throw new IOException(itoe.getMessage());
583                    }
584            }
585    
586            @Override
587            public Version getVersion() {
588                    return _version;
589            }
590    
591            @Override
592            public boolean isLoadIndexFromClusterEnabled() {
593                    if (PropsValues.CLUSTER_LINK_ENABLED &&
594                            PropsValues.LUCENE_REPLICATE_WRITE) {
595    
596                            return true;
597                    }
598    
599                    if (_log.isDebugEnabled()) {
600                            _log.debug("Load index from cluster is not enabled");
601                    }
602    
603                    return false;
604            }
605    
606            @Override
607            public void loadIndex(long companyId, InputStream inputStream)
608                    throws IOException {
609    
610                    if (!isLoadIndexFromClusterEnabled()) {
611                            return;
612                    }
613    
614                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
615    
616                    if (indexAccessor == null) {
617                            if (_log.isInfoEnabled()) {
618                                    _log.info(
619                                            "Skip loading Lucene index files for company " + companyId +
620                                                    " in favor of lazy loading");
621                            }
622    
623                            return;
624                    }
625    
626                    StopWatch stopWatch = new StopWatch();
627    
628                    stopWatch.start();
629    
630                    if (_log.isInfoEnabled()) {
631                            _log.info(
632                                    "Start loading Lucene index files for company " + companyId);
633                    }
634    
635                    indexAccessor.loadIndex(inputStream);
636    
637                    if (_log.isInfoEnabled()) {
638                            _log.info(
639                                    "Finished loading index files for company " + companyId +
640                                            " in " + stopWatch.getTime() + " ms");
641                    }
642            }
643    
644            @Override
645            public void loadIndexesFromCluster(long companyId) throws SystemException {
646                    if (!isLoadIndexFromClusterEnabled()) {
647                            return;
648                    }
649    
650                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
651    
652                    if (indexAccessor == null) {
653                            return;
654                    }
655    
656                    long localLastGeneration = getLastGeneration(companyId);
657    
658                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
659            }
660    
661            @Override
662            public void releaseIndexSearcher(
663                            long companyId, IndexSearcher indexSearcher)
664                    throws IOException {
665    
666                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
667    
668                    indexAccessor.releaseIndexSearcher(indexSearcher);
669            }
670    
671            public void setAnalyzer(Analyzer analyzer) {
672                    _analyzer = analyzer;
673            }
674    
675            public void setVersion(Version version) {
676                    _version = version;
677            }
678    
679            @Override
680            public void shutdown() {
681                    if (_luceneIndexThreadPoolExecutor != null) {
682                            _luceneIndexThreadPoolExecutor.shutdownNow();
683    
684                            try {
685                                    _luceneIndexThreadPoolExecutor.awaitTermination(
686                                            60, TimeUnit.SECONDS);
687                            }
688                            catch (InterruptedException ie) {
689                                    _log.error("Lucene indexer shutdown interrupted", ie);
690                            }
691                    }
692    
693                    if (isLoadIndexFromClusterEnabled()) {
694                            ClusterExecutorUtil.removeClusterEventListener(
695                                    _loadIndexClusterEventListener);
696                    }
697    
698                    MessageBus messageBus = MessageBusUtil.getMessageBus();
699    
700                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
701                            String searchWriterDestinationName =
702                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
703    
704                            Destination searchWriteDestination = messageBus.getDestination(
705                                    searchWriterDestinationName);
706    
707                            if (searchWriteDestination != null) {
708                                    ThreadPoolExecutor threadPoolExecutor =
709                                            PortalExecutorManagerUtil.getPortalExecutor(
710                                                    searchWriterDestinationName);
711    
712                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
713    
714                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
715    
716                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
717                                            countDownLatch);
718    
719                                    for (int i = 0; i < maxPoolSize; i++) {
720                                            threadPoolExecutor.submit(shutdownSyncJob);
721                                    }
722    
723                                    try {
724                                            countDownLatch.await();
725                                    }
726                                    catch (InterruptedException ie) {
727                                            _log.error("Shutdown waiting interrupted", ie);
728                                    }
729    
730                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
731    
732                                    if (_log.isDebugEnabled()) {
733                                            _log.debug(
734                                                    "Cancelled appending indexing jobs: " + runnables);
735                                    }
736    
737                                    searchWriteDestination.close(true);
738                            }
739                    }
740    
741                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
742                            indexAccessor.close();
743                    }
744            }
745    
746            @Override
747            public void shutdown(long companyId) {
748                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
749    
750                    _indexAccessors.remove(indexAccessor);
751    
752                    indexAccessor.close();
753            }
754    
755            @Override
756            public void startup(long companyId) {
757                    if (!PropsValues.INDEX_ON_STARTUP) {
758                            return;
759                    }
760    
761                    if (_log.isInfoEnabled()) {
762                            _log.info("Indexing Lucene on startup");
763                    }
764    
765                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
766    
767                    if (PropsValues.INDEX_WITH_THREAD) {
768                            if (_luceneIndexThreadPoolExecutor == null) {
769    
770                                    // This should never be null except for the case where
771                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
772                                    // PropsValues#INDEX_ON_STARTUP to true.
773    
774                                    _luceneIndexThreadPoolExecutor =
775                                            PortalExecutorManagerUtil.getPortalExecutor(
776                                                    LuceneHelperImpl.class.getName());
777                            }
778    
779                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
780                    }
781                    else {
782                            luceneIndexer.reindex();
783                    }
784            }
785    
786            @Override
787            public void updateDocument(long companyId, Term term, Document document)
788                    throws IOException {
789    
790                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
791    
792                    indexAccessor.updateDocument(term, document);
793            }
794    
795            private LuceneHelperImpl() {
796                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
797                            _luceneIndexThreadPoolExecutor =
798                                    PortalExecutorManagerUtil.getPortalExecutor(
799                                            LuceneHelperImpl.class.getName());
800                    }
801    
802                    if (isLoadIndexFromClusterEnabled()) {
803                            _loadIndexClusterEventListener =
804                                    new LoadIndexClusterEventListener();
805    
806                            ClusterExecutorUtil.addClusterEventListener(
807                                    _loadIndexClusterEventListener);
808                    }
809    
810                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
811            }
812    
813            private ObjectValuePair<String, URL>
814                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
815                    throws SystemException {
816    
817                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
818                            new MethodHandler(
819                                    _createTokenMethodKey,
820                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
821                            bootupAddress);
822    
823                    FutureClusterResponses futureClusterResponses =
824                            ClusterExecutorUtil.execute(clusterRequest);
825    
826                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
827                            futureClusterResponses.getPartialResults();
828    
829                    try {
830                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
831                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
832                                    TimeUnit.MILLISECONDS);
833    
834                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
835    
836                            int port = clusterNode.getPort();
837    
838                            if (port <= 0) {
839                                    StringBundler sb = new StringBundler(6);
840    
841                                    sb.append("Invalid cluster node port ");
842                                    sb.append(port);
843                                    sb.append(". The port is set by the first request or ");
844                                    sb.append("configured in portal.properties by the properties ");
845                                    sb.append("\"portal.instance.http.port\" and ");
846                                    sb.append("\"portal.instance.https.port\".");
847    
848                                    throw new Exception(sb.toString());
849                            }
850    
851                            String protocol = clusterNode.getPortalProtocol();
852    
853                            if (Validator.isNull(protocol)) {
854                                    StringBundler sb = new StringBundler(4);
855    
856                                    sb.append("Cluster node protocol is empty. The protocol is ");
857                                    sb.append("set by the first request or configured in ");
858                                    sb.append("portal.properties by the property ");
859                                    sb.append("\"portal.instance.protocol\"");
860    
861                                    throw new Exception(sb.toString());
862                            }
863    
864                            InetAddress inetAddress = clusterNode.getInetAddress();
865    
866                            String fileName = PortalUtil.getPathContext();
867    
868                            if (!fileName.endsWith(StringPool.SLASH)) {
869                                    fileName = fileName.concat(StringPool.SLASH);
870                            }
871    
872                            fileName = fileName.concat("lucene/dump");
873    
874                            URL url = new URL(
875                                    protocol, inetAddress.getHostAddress(), port, fileName);
876    
877                            String transientToken = (String)clusterNodeResponse.getResult();
878    
879                            return new ObjectValuePair<String, URL>(transientToken, url);
880                    }
881                    catch (Exception e) {
882                            throw new SystemException(e);
883                    }
884            }
885    
886            private void _includeIfUnique(
887                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
888                    Query query, BooleanClause.Occur occur) {
889    
890                    if (query instanceof TermQuery) {
891                            Set<Term> terms = new HashSet<Term>();
892    
893                            TermQuery termQuery = (TermQuery)query;
894    
895                            termQuery.extractTerms(terms);
896    
897                            float boost = termQuery.getBoost();
898    
899                            for (Term term : terms) {
900                                    String termValue = term.text();
901    
902                                    if (like) {
903                                            termValue = termValue.toLowerCase(queryParser.getLocale());
904    
905                                            term = term.createTerm(
906                                                    StringPool.STAR.concat(termValue).concat(
907                                                            StringPool.STAR));
908    
909                                            query = new WildcardQuery(term);
910                                    }
911                                    else {
912                                            query = new TermQuery(term);
913                                    }
914    
915                                    query.setBoost(boost);
916    
917                                    boolean included = false;
918    
919                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
920                                            if (query.equals(booleanClause.getQuery())) {
921                                                    included = true;
922                                            }
923                                    }
924    
925                                    if (!included) {
926                                            booleanQuery.add(query, occur);
927                                    }
928                            }
929                    }
930                    else if (query instanceof BooleanQuery) {
931                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
932    
933                            BooleanQuery containerBooleanQuery = new BooleanQuery();
934    
935                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
936                                    _includeIfUnique(
937                                            containerBooleanQuery, like, queryParser,
938                                            booleanClause.getQuery(), booleanClause.getOccur());
939                            }
940    
941                            if (containerBooleanQuery.getClauses().length > 0) {
942                                    booleanQuery.add(containerBooleanQuery, occur);
943                            }
944                    }
945                    else {
946                            boolean included = false;
947    
948                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
949                                    if (query.equals(booleanClause.getQuery())) {
950                                            included = true;
951                                    }
952                            }
953    
954                            if (!included) {
955                                    booleanQuery.add(query, occur);
956                            }
957                    }
958            }
959    
960            private void _loadIndexFromCluster(
961                            IndexAccessor indexAccessor, long localLastGeneration)
962                    throws SystemException {
963    
964                    List<Address> clusterNodeAddresses =
965                            ClusterExecutorUtil.getClusterNodeAddresses();
966    
967                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
968    
969                    if (clusterNodeAddressesCount <= 1) {
970                            if (_log.isDebugEnabled()) {
971                                    _log.debug(
972                                            "Do not load indexes because there is either one portal " +
973                                                    "instance or no portal instances in the cluster");
974                            }
975    
976                            return;
977                    }
978    
979                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
980                            new MethodHandler(
981                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
982                            true);
983    
984                    ClusterExecutorUtil.execute(
985                            clusterRequest,
986                            new LoadIndexClusterResponseCallback(
987                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
988            }
989    
990            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
991                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
992    
993            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
994                    GetterUtil.getInteger(
995                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
996                            BooleanQuery.getMaxClauseCount());
997    
998            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
999    
1000            private static MethodKey _createTokenMethodKey = new MethodKey(
1001                    TransientTokenUtil.class, "createToken", long.class);
1002            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1003                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1004    
1005            private Analyzer _analyzer;
1006            private Map<Long, IndexAccessor> _indexAccessors =
1007                    new ConcurrentHashMap<Long, IndexAccessor>();
1008            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1009            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1010            private Version _version;
1011    
1012            private static class ShutdownSyncJob implements Runnable {
1013    
1014                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1015                            _countDownLatch = countDownLatch;
1016                    }
1017    
1018                    @Override
1019                    public void run() {
1020                            _countDownLatch.countDown();
1021    
1022                            try {
1023                                    synchronized (this) {
1024                                            wait();
1025                                    }
1026                            }
1027                            catch (InterruptedException ie) {
1028                            }
1029                    }
1030    
1031                    private final CountDownLatch _countDownLatch;
1032    
1033            }
1034    
1035            private class LoadIndexClusterEventListener
1036                    implements ClusterEventListener {
1037    
1038                    @Override
1039                    public void processClusterEvent(ClusterEvent clusterEvent) {
1040                            ClusterEventType clusterEventType =
1041                                    clusterEvent.getClusterEventType();
1042    
1043                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1044                                    return;
1045                            }
1046    
1047                            List<Address> clusterNodeAddresses =
1048                                    ClusterExecutorUtil.getClusterNodeAddresses();
1049                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1050    
1051                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1052                                    if (_log.isDebugEnabled()) {
1053                                            _log.debug(
1054                                                    "Number of original cluster members is greater than " +
1055                                                            "one");
1056                                    }
1057    
1058                                    return;
1059                            }
1060    
1061                            long[] companyIds = PortalInstances.getCompanyIds();
1062    
1063                            for (long companyId : companyIds) {
1064                                    loadIndexes(companyId);
1065                            }
1066    
1067                            loadIndexes(CompanyConstants.SYSTEM);
1068                    }
1069    
1070                    private void loadIndexes(long companyId) {
1071                            long lastGeneration = getLastGeneration(companyId);
1072    
1073                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1074                                    return;
1075                            }
1076    
1077                            try {
1078                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
1079                            }
1080                            catch (Exception e) {
1081                                    _log.error(
1082                                            "Unable to load indexes for company " + companyId, e);
1083                            }
1084                    }
1085    
1086            }
1087    
1088            private class LoadIndexClusterResponseCallback
1089                    extends BaseClusterResponseCallback {
1090    
1091                    public LoadIndexClusterResponseCallback(
1092                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1093                            long localLastGeneration) {
1094    
1095                            _indexAccessor = indexAccessor;
1096                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
1097                            _localLastGeneration = localLastGeneration;
1098    
1099                            _companyId = _indexAccessor.getCompanyId();
1100                    }
1101    
1102                    @Override
1103                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1104                            Address bootupAddress = null;
1105    
1106                            do {
1107                                    _clusterNodeAddressesCount--;
1108    
1109                                    ClusterNodeResponse clusterNodeResponse = null;
1110    
1111                                    try {
1112                                            clusterNodeResponse = blockingQueue.poll(
1113                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1114                                                    TimeUnit.MILLISECONDS);
1115                                    }
1116                                    catch (Exception e) {
1117                                            _log.error("Unable to get cluster node response", e);
1118                                    }
1119    
1120                                    if (clusterNodeResponse == null) {
1121                                            if (_log.isDebugEnabled()) {
1122                                                    _log.debug(
1123                                                            "Unable to get cluster node response in " +
1124                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1125                                                                            TimeUnit.MILLISECONDS);
1126                                            }
1127    
1128                                            continue;
1129                                    }
1130    
1131                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1132    
1133                                    if (clusterNode.getPort() > 0) {
1134                                            try {
1135                                                    long remoteLastGeneration =
1136                                                            (Long)clusterNodeResponse.getResult();
1137    
1138                                                    if (remoteLastGeneration > _localLastGeneration) {
1139                                                            bootupAddress = clusterNodeResponse.getAddress();
1140    
1141                                                            break;
1142                                                    }
1143                                            }
1144                                            catch (Exception e) {
1145                                                    if (_log.isDebugEnabled()) {
1146                                                            _log.debug(
1147                                                                    "Suppress exception caused by remote method " +
1148                                                                            "invocation",
1149                                                                    e);
1150                                                    }
1151    
1152                                                    continue;
1153                                            }
1154                                    }
1155                                    else {
1156                                            if (_log.isDebugEnabled()) {
1157                                                    _log.debug(
1158                                                            "Cluster node " + clusterNode +
1159                                                                    " has invalid port");
1160                                            }
1161                                    }
1162                            }
1163                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1164    
1165                            if (bootupAddress == null) {
1166                                    return;
1167                            }
1168    
1169                            if (_log.isInfoEnabled()) {
1170                                    _log.info(
1171                                            "Start loading lucene index files from cluster node " +
1172                                                    bootupAddress);
1173                            }
1174    
1175                            InputStream inputStream = null;
1176    
1177                            try {
1178                                    inputStream = getLoadIndexesInputStreamFromCluster(
1179                                            _companyId, bootupAddress);
1180    
1181                                    _indexAccessor.loadIndex(inputStream);
1182    
1183                                    if (_log.isInfoEnabled()) {
1184                                            _log.info("Lucene index files loaded successfully");
1185                                    }
1186                            }
1187                            catch (Exception e) {
1188                                    _log.error("Unable to load index for company " + _companyId, e);
1189                            }
1190                            finally {
1191                                    if (inputStream != null) {
1192                                            try {
1193                                                    inputStream.close();
1194                                            }
1195                                            catch (IOException ioe) {
1196                                                    _log.error(
1197                                                            "Unable to close input stream for company " +
1198                                                                    _companyId,
1199                                                            ioe);
1200                                            }
1201                                    }
1202                            }
1203                    }
1204    
1205                    @Override
1206                    public void processTimeoutException(TimeoutException timeoutException) {
1207                            _log.error(
1208                                    "Unable to load index for company " + _companyId,
1209                                    timeoutException);
1210                    }
1211    
1212                    private int _clusterNodeAddressesCount;
1213                    private long _companyId;
1214                    private IndexAccessor _indexAccessor;
1215                    private long _localLastGeneration;
1216    
1217            }
1218    
1219    }