001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterLink;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032    import com.liferay.portal.kernel.log.Log;
033    import com.liferay.portal.kernel.log.LogFactoryUtil;
034    import com.liferay.portal.kernel.messaging.Destination;
035    import com.liferay.portal.kernel.messaging.MessageBus;
036    import com.liferay.portal.kernel.messaging.MessageBusUtil;
037    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038    import com.liferay.portal.kernel.search.BooleanClauseOccur;
039    import com.liferay.portal.kernel.search.Field;
040    import com.liferay.portal.kernel.search.SearchEngineUtil;
041    import com.liferay.portal.kernel.util.ArrayUtil;
042    import com.liferay.portal.kernel.util.GetterUtil;
043    import com.liferay.portal.kernel.util.MethodHandler;
044    import com.liferay.portal.kernel.util.MethodKey;
045    import com.liferay.portal.kernel.util.ObjectValuePair;
046    import com.liferay.portal.kernel.util.PropsKeys;
047    import com.liferay.portal.kernel.util.StringBundler;
048    import com.liferay.portal.kernel.util.StringPool;
049    import com.liferay.portal.kernel.util.StringUtil;
050    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
051    import com.liferay.portal.kernel.util.Validator;
052    import com.liferay.portal.model.CompanyConstants;
053    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
054    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
055    import com.liferay.portal.security.auth.TransientTokenUtil;
056    import com.liferay.portal.util.PortalInstances;
057    import com.liferay.portal.util.PortalUtil;
058    import com.liferay.portal.util.PropsUtil;
059    import com.liferay.portal.util.PropsValues;
060    
061    import java.io.IOException;
062    import java.io.InputStream;
063    import java.io.OutputStream;
064    
065    import java.net.InetAddress;
066    import java.net.URL;
067    import java.net.URLConnection;
068    
069    import java.util.HashSet;
070    import java.util.List;
071    import java.util.Map;
072    import java.util.Set;
073    import java.util.concurrent.BlockingQueue;
074    import java.util.concurrent.ConcurrentHashMap;
075    import java.util.concurrent.CountDownLatch;
076    import java.util.concurrent.TimeUnit;
077    
078    import org.apache.commons.lang.time.StopWatch;
079    import org.apache.lucene.analysis.Analyzer;
080    import org.apache.lucene.analysis.TokenStream;
081    import org.apache.lucene.document.Document;
082    import org.apache.lucene.index.IndexReader;
083    import org.apache.lucene.index.Term;
084    import org.apache.lucene.queryParser.QueryParser;
085    import org.apache.lucene.search.BooleanClause;
086    import org.apache.lucene.search.BooleanQuery;
087    import org.apache.lucene.search.IndexSearcher;
088    import org.apache.lucene.search.NumericRangeQuery;
089    import org.apache.lucene.search.Query;
090    import org.apache.lucene.search.TermQuery;
091    import org.apache.lucene.search.TermRangeQuery;
092    import org.apache.lucene.search.WildcardQuery;
093    import org.apache.lucene.search.highlight.Formatter;
094    import org.apache.lucene.search.highlight.Highlighter;
095    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
096    import org.apache.lucene.search.highlight.QueryScorer;
097    import org.apache.lucene.search.highlight.SimpleFragmenter;
098    import org.apache.lucene.search.highlight.WeightedTerm;
099    import org.apache.lucene.util.Version;
100    
101    /**
102     * @author Brian Wing Shun Chan
103     * @author Harry Mark
104     * @author Bruno Farache
105     * @author Shuyang Zhou
106     * @author Tina Tian
107     * @author Hugo Huijser
108     * @author Andrea Di Giorgi
109     */
110    public class LuceneHelperImpl implements LuceneHelper {
111    
112            @Override
113            public void addDocument(long companyId, Document document)
114                    throws IOException {
115    
116                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
117    
118                    indexAccessor.addDocument(document);
119            }
120    
121            @Override
122            public void addExactTerm(
123                    BooleanQuery booleanQuery, String field, String value) {
124    
125                    addTerm(booleanQuery, field, value, false);
126            }
127    
128            @Override
129            public void addNumericRangeTerm(
130                    BooleanQuery booleanQuery, String field, Integer startValue,
131                    Integer endValue) {
132    
133                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
134                            field, startValue, endValue, true, true);
135    
136                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
137            }
138    
139            @Override
140            public void addNumericRangeTerm(
141                    BooleanQuery booleanQuery, String field, Long startValue,
142                    Long endValue) {
143    
144                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
145                            field, startValue, endValue, true, true);
146    
147                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
148            }
149    
150            /**
151             * @deprecated As of 6.2.0, replaced by {@link
152             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
153             */
154            @Override
155            public void addNumericRangeTerm(
156                    BooleanQuery booleanQuery, String field, String startValue,
157                    String endValue) {
158    
159                    addNumericRangeTerm(
160                            booleanQuery, field, GetterUtil.getLong(startValue),
161                            GetterUtil.getLong(endValue));
162            }
163    
164            @Override
165            public void addRangeTerm(
166                    BooleanQuery booleanQuery, String field, String startValue,
167                    String endValue) {
168    
169                    boolean includesLower = true;
170    
171                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
172                            includesLower = false;
173                    }
174    
175                    boolean includesUpper = true;
176    
177                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
178                            includesUpper = false;
179                    }
180    
181                    TermRangeQuery termRangeQuery = new TermRangeQuery(
182                            field, startValue, endValue, includesLower, includesUpper);
183    
184                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
185            }
186    
187            @Override
188            public void addRequiredTerm(
189                    BooleanQuery booleanQuery, String field, String value, boolean like) {
190    
191                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
192            }
193    
194            @Override
195            public void addRequiredTerm(
196                    BooleanQuery booleanQuery, String field, String[] values,
197                    boolean like) {
198    
199                    if (values == null) {
200                            return;
201                    }
202    
203                    BooleanQuery query = new BooleanQuery();
204    
205                    for (String value : values) {
206                            addTerm(query, field, value, like);
207                    }
208    
209                    booleanQuery.add(query, BooleanClause.Occur.MUST);
210            }
211    
212            @Override
213            public void addTerm(
214                    BooleanQuery booleanQuery, String field, String value, boolean like) {
215    
216                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
217            }
218    
219            @Override
220            public void addTerm(
221                    BooleanQuery booleanQuery, String field, String value, boolean like,
222                    BooleanClauseOccur booleanClauseOccur) {
223    
224                    if (Validator.isNull(value)) {
225                            return;
226                    }
227    
228                    Analyzer analyzer = getAnalyzer();
229    
230                    if (analyzer instanceof PerFieldAnalyzer) {
231                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
232    
233                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
234    
235                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
236                                    like = true;
237                            }
238                    }
239    
240                    if (like) {
241                            value = StringUtil.replace(
242                                    value, StringPool.PERCENT, StringPool.BLANK);
243                    }
244    
245                    try {
246                            QueryParser queryParser = new QueryParser(
247                                    getVersion(), field, analyzer);
248    
249                            Query query = queryParser.parse(value);
250    
251                            BooleanClause.Occur occur = null;
252    
253                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
254                                    occur = BooleanClause.Occur.MUST;
255                            }
256                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
257                                    occur = BooleanClause.Occur.MUST_NOT;
258                            }
259                            else {
260                                    occur = BooleanClause.Occur.SHOULD;
261                            }
262    
263                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
264                    }
265                    catch (Exception e) {
266                            if (_log.isWarnEnabled()) {
267                                    _log.warn(e, e);
268                            }
269                    }
270            }
271    
272            @Override
273            public void addTerm(
274                    BooleanQuery booleanQuery, String field, String[] values,
275                    boolean like) {
276    
277                    for (String value : values) {
278                            addTerm(booleanQuery, field, value, like);
279                    }
280            }
281    
282            /**
283             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
284             *             IndexSearcher)}
285             */
286            @Deprecated
287            @Override
288            public void cleanUp(IndexSearcher indexSearcher) {
289                    if (indexSearcher == null) {
290                            return;
291                    }
292    
293                    try {
294                            indexSearcher.close();
295    
296                            IndexReader indexReader = indexSearcher.getIndexReader();
297    
298                            if (indexReader != null) {
299                                    indexReader.close();
300                            }
301                    }
302                    catch (IOException ioe) {
303                            _log.error(ioe, ioe);
304                    }
305            }
306    
307            @Override
308            public int countScoredFieldNames(Query query, String[] filedNames) {
309                    int count = 0;
310    
311                    for (String fieldName : filedNames) {
312                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
313                                    query, false, fieldName);
314    
315                            if ((weightedTerms.length > 0) &&
316                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
317    
318                                    count++;
319                            }
320                    }
321    
322                    return count;
323            }
324    
325            @Override
326            public void delete(long companyId) {
327                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
328    
329                    if (indexAccessor == null) {
330                            return;
331                    }
332    
333                    indexAccessor.delete();
334            }
335    
336            @Override
337            public void deleteDocuments(long companyId, Term term) throws IOException {
338                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
339    
340                    if (indexAccessor == null) {
341                            return;
342                    }
343    
344                    indexAccessor.deleteDocuments(term);
345            }
346    
347            @Override
348            public void dumpIndex(long companyId, OutputStream outputStream)
349                    throws IOException {
350    
351                    long lastGeneration = getLastGeneration(companyId);
352    
353                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
354                            if (_log.isDebugEnabled()) {
355                                    _log.debug(
356                                            "Dump index from cluster is not enabled for " + companyId);
357                            }
358    
359                            return;
360                    }
361    
362                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
363    
364                    if (indexAccessor == null) {
365                            return;
366                    }
367    
368                    indexAccessor.dumpIndex(outputStream);
369            }
370    
371            @Override
372            public Analyzer getAnalyzer() {
373                    return _analyzer;
374            }
375    
376            @Override
377            public IndexAccessor getIndexAccessor(long companyId) {
378                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
379    
380                    if (indexAccessor != null) {
381                            return indexAccessor;
382                    }
383    
384                    synchronized (this) {
385                            indexAccessor = _indexAccessors.get(companyId);
386    
387                            if (indexAccessor == null) {
388                                    indexAccessor = new IndexAccessorImpl(companyId);
389    
390                                    if (isLoadIndexFromClusterEnabled()) {
391                                            indexAccessor = new SynchronizedIndexAccessorImpl(
392                                                    indexAccessor);
393    
394                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
395                                                    MessageValuesThreadLocal.getValue(
396                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
397    
398                                            if (clusterForwardMessage) {
399                                                    if (_log.isInfoEnabled()) {
400                                                            _log.info(
401                                                                    "Skip Luncene index files cluster loading " +
402                                                                            "since this is a manual reindex request");
403                                                    }
404                                            }
405                                            else {
406                                                    try {
407                                                            _loadIndexFromCluster(
408                                                                    indexAccessor,
409                                                                    indexAccessor.getLastGeneration());
410                                                    }
411                                                    catch (Exception e) {
412                                                            _log.error(
413                                                                    "Unable to load index for company " +
414                                                                            indexAccessor.getCompanyId(),
415                                                                    e);
416                                                    }
417                                            }
418                                    }
419    
420                                    _indexAccessors.put(companyId, indexAccessor);
421                            }
422                    }
423    
424                    return indexAccessor;
425            }
426    
427            @Override
428            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
429                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
430    
431                    return indexAccessor.acquireIndexSearcher();
432            }
433    
434            @Override
435            public long getLastGeneration(long companyId) {
436                    if (!isLoadIndexFromClusterEnabled()) {
437                            return IndexAccessor.DEFAULT_LAST_GENERATION;
438                    }
439    
440                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
441    
442                    if (indexAccessor == null) {
443                            return IndexAccessor.DEFAULT_LAST_GENERATION;
444                    }
445    
446                    return indexAccessor.getLastGeneration();
447            }
448    
449            @Override
450            public InputStream getLoadIndexesInputStreamFromCluster(
451                            long companyId, Address bootupAddress)
452                    throws SystemException {
453    
454                    if (!isLoadIndexFromClusterEnabled()) {
455                            return null;
456                    }
457    
458                    InputStream inputStream = null;
459    
460                    try {
461                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
462                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
463    
464                            URL url = bootupClusterNodeObjectValuePair.getValue();
465    
466                            URLConnection urlConnection = url.openConnection();
467    
468                            urlConnection.setDoOutput(true);
469    
470                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
471                                    urlConnection.getOutputStream());
472    
473                            unsyncPrintWriter.write("transientToken=");
474                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
475                            unsyncPrintWriter.write("&companyId=");
476                            unsyncPrintWriter.write(String.valueOf(companyId));
477    
478                            unsyncPrintWriter.close();
479    
480                            inputStream = urlConnection.getInputStream();
481    
482                            return inputStream;
483                    }
484                    catch (IOException ioe) {
485                            throw new SystemException(ioe);
486                    }
487            }
488    
489            @Override
490            public Set<String> getQueryTerms(Query query) {
491                    String queryString = StringUtil.replace(
492                            query.toString(), StringPool.STAR, StringPool.BLANK);
493    
494                    Query tempQuery = null;
495    
496                    try {
497                            QueryParser queryParser = new QueryParser(
498                                    getVersion(), StringPool.BLANK, getAnalyzer());
499    
500                            tempQuery = queryParser.parse(queryString);
501                    }
502                    catch (Exception e) {
503                            if (_log.isWarnEnabled()) {
504                                    _log.warn("Unable to parse " + queryString);
505                            }
506    
507                            tempQuery = query;
508                    }
509    
510                    WeightedTerm[] weightedTerms = null;
511    
512                    for (String fieldName : Field.KEYWORDS) {
513                            weightedTerms = QueryTermExtractor.getTerms(
514                                    tempQuery, false, fieldName);
515    
516                            if (weightedTerms.length > 0) {
517                                    break;
518                            }
519                    }
520    
521                    Set<String> queryTerms = new HashSet<String>();
522    
523                    for (WeightedTerm weightedTerm : weightedTerms) {
524                            queryTerms.add(weightedTerm.getTerm());
525                    }
526    
527                    return queryTerms;
528            }
529    
530            /**
531             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
532             */
533            @Deprecated
534            @Override
535            public IndexSearcher getSearcher(long companyId, boolean readOnly)
536                    throws IOException {
537    
538                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
539    
540                    IndexReader indexReader = IndexReader.open(
541                            indexAccessor.getLuceneDir(), readOnly);
542    
543                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
544    
545                    indexSearcher.setDefaultFieldSortScoring(true, false);
546                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
547    
548                    return indexSearcher;
549            }
550    
551            @Override
552            public String getSnippet(
553                            Query query, String field, String s, int maxNumFragments,
554                            int fragmentLength, String fragmentSuffix, Formatter formatter)
555                    throws IOException {
556    
557                    QueryScorer queryScorer = new QueryScorer(query, field);
558    
559                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
560    
561                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
562    
563                    TokenStream tokenStream = getAnalyzer().tokenStream(
564                            field, new UnsyncStringReader(s));
565    
566                    try {
567                            String snippet = highlighter.getBestFragments(
568                                    tokenStream, s, maxNumFragments, fragmentSuffix);
569    
570                            if (Validator.isNotNull(snippet) &&
571                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
572                                    !s.equals(snippet)) {
573    
574                                    snippet = snippet.concat(fragmentSuffix);
575                            }
576    
577                            return snippet;
578                    }
579                    catch (InvalidTokenOffsetsException itoe) {
580                            throw new IOException(itoe.getMessage());
581                    }
582            }
583    
584            @Override
585            public Version getVersion() {
586                    return _version;
587            }
588    
589            @Override
590            public boolean isLoadIndexFromClusterEnabled() {
591                    if (PropsValues.CLUSTER_LINK_ENABLED &&
592                            PropsValues.LUCENE_REPLICATE_WRITE) {
593    
594                            return true;
595                    }
596    
597                    if (_log.isDebugEnabled()) {
598                            _log.debug("Load index from cluster is not enabled");
599                    }
600    
601                    return false;
602            }
603    
604            @Override
605            public void loadIndex(long companyId, InputStream inputStream)
606                    throws IOException {
607    
608                    if (!isLoadIndexFromClusterEnabled()) {
609                            return;
610                    }
611    
612                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
613    
614                    if (indexAccessor == null) {
615                            if (_log.isInfoEnabled()) {
616                                    _log.info(
617                                            "Skip loading Lucene index files for company " + companyId +
618                                                    " in favor of lazy loading");
619                            }
620    
621                            return;
622                    }
623    
624                    StopWatch stopWatch = new StopWatch();
625    
626                    stopWatch.start();
627    
628                    if (_log.isInfoEnabled()) {
629                            _log.info(
630                                    "Start loading Lucene index files for company " + companyId);
631                    }
632    
633                    indexAccessor.loadIndex(inputStream);
634    
635                    if (_log.isInfoEnabled()) {
636                            _log.info(
637                                    "Finished loading index files for company " + companyId +
638                                            " in " + stopWatch.getTime() + " ms");
639                    }
640            }
641    
642            @Override
643            public void loadIndexesFromCluster(long companyId) throws SystemException {
644                    if (!isLoadIndexFromClusterEnabled()) {
645                            return;
646                    }
647    
648                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
649    
650                    if (indexAccessor == null) {
651                            return;
652                    }
653    
654                    long localLastGeneration = getLastGeneration(companyId);
655    
656                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
657            }
658    
659            @Override
660            public void releaseIndexSearcher(
661                            long companyId, IndexSearcher indexSearcher)
662                    throws IOException {
663    
664                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
665    
666                    indexAccessor.releaseIndexSearcher(indexSearcher);
667            }
668    
669            public void setAnalyzer(Analyzer analyzer) {
670                    _analyzer = analyzer;
671            }
672    
673            public void setVersion(Version version) {
674                    _version = version;
675            }
676    
677            @Override
678            public void shutdown() {
679                    if (_luceneIndexThreadPoolExecutor != null) {
680                            _luceneIndexThreadPoolExecutor.shutdownNow();
681    
682                            try {
683                                    _luceneIndexThreadPoolExecutor.awaitTermination(
684                                            60, TimeUnit.SECONDS);
685                            }
686                            catch (InterruptedException ie) {
687                                    _log.error("Lucene indexer shutdown interrupted", ie);
688                            }
689                    }
690    
691                    if (isLoadIndexFromClusterEnabled()) {
692                            ClusterExecutorUtil.removeClusterEventListener(
693                                    _loadIndexClusterEventListener);
694                    }
695    
696                    MessageBus messageBus = MessageBusUtil.getMessageBus();
697    
698                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
699                            String searchWriterDestinationName =
700                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
701    
702                            Destination searchWriteDestination = messageBus.getDestination(
703                                    searchWriterDestinationName);
704    
705                            if (searchWriteDestination != null) {
706                                    ThreadPoolExecutor threadPoolExecutor =
707                                            PortalExecutorManagerUtil.getPortalExecutor(
708                                                    searchWriterDestinationName);
709    
710                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
711    
712                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
713    
714                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
715                                            countDownLatch);
716    
717                                    for (int i = 0; i < maxPoolSize; i++) {
718                                            threadPoolExecutor.submit(shutdownSyncJob);
719                                    }
720    
721                                    try {
722                                            countDownLatch.await();
723                                    }
724                                    catch (InterruptedException ie) {
725                                            _log.error("Shutdown waiting interrupted", ie);
726                                    }
727    
728                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
729    
730                                    if (_log.isDebugEnabled()) {
731                                            _log.debug(
732                                                    "Cancelled appending indexing jobs: " + runnables);
733                                    }
734    
735                                    searchWriteDestination.close(true);
736                            }
737                    }
738    
739                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
740                            indexAccessor.close();
741                    }
742            }
743    
744            @Override
745            public void shutdown(long companyId) {
746                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
747    
748                    _indexAccessors.remove(indexAccessor);
749    
750                    indexAccessor.close();
751            }
752    
753            @Override
754            public void startup(long companyId) {
755                    if (!PropsValues.INDEX_ON_STARTUP) {
756                            return;
757                    }
758    
759                    if (_log.isInfoEnabled()) {
760                            _log.info("Indexing Lucene on startup");
761                    }
762    
763                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
764    
765                    if (PropsValues.INDEX_WITH_THREAD) {
766                            if (_luceneIndexThreadPoolExecutor == null) {
767    
768                                    // This should never be null except for the case where
769                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
770                                    // PropsValues#INDEX_ON_STARTUP to true.
771    
772                                    _luceneIndexThreadPoolExecutor =
773                                            PortalExecutorManagerUtil.getPortalExecutor(
774                                                    LuceneHelperImpl.class.getName());
775                            }
776    
777                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
778                    }
779                    else {
780                            luceneIndexer.reindex();
781                    }
782            }
783    
784            @Override
785            public void updateDocument(long companyId, Term term, Document document)
786                    throws IOException {
787    
788                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
789    
790                    indexAccessor.updateDocument(term, document);
791            }
792    
793            private LuceneHelperImpl() {
794                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
795                            _luceneIndexThreadPoolExecutor =
796                                    PortalExecutorManagerUtil.getPortalExecutor(
797                                            LuceneHelperImpl.class.getName());
798                    }
799    
800                    if (isLoadIndexFromClusterEnabled()) {
801                            _loadIndexClusterEventListener =
802                                    new LoadIndexClusterEventListener();
803    
804                            ClusterExecutorUtil.addClusterEventListener(
805                                    _loadIndexClusterEventListener);
806                    }
807    
808                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
809            }
810    
811            private ObjectValuePair<String, URL>
812                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
813                    throws SystemException {
814    
815                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
816                            new MethodHandler(
817                                    _createTokenMethodKey,
818                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
819                            bootupAddress);
820    
821                    FutureClusterResponses futureClusterResponses =
822                            ClusterExecutorUtil.execute(clusterRequest);
823    
824                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
825                            futureClusterResponses.getPartialResults();
826    
827                    try {
828                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
829                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
830                                    TimeUnit.MILLISECONDS);
831    
832                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
833    
834                            int port = clusterNode.getPort();
835    
836                            if (port <= 0) {
837                                    StringBundler sb = new StringBundler(6);
838    
839                                    sb.append("Invalid cluster node port ");
840                                    sb.append(port);
841                                    sb.append(". The port is set by the first request or ");
842                                    sb.append("configured in portal.properties by the properties ");
843                                    sb.append("\"portal.instance.http.port\" and ");
844                                    sb.append("\"portal.instance.https.port\".");
845    
846                                    throw new Exception(sb.toString());
847                            }
848    
849                            String protocol = clusterNode.getPortalProtocol();
850    
851                            if (Validator.isNull(protocol)) {
852                                    StringBundler sb = new StringBundler(4);
853    
854                                    sb.append("Cluster node protocol is empty. The protocol is ");
855                                    sb.append("set by the first request or configured in ");
856                                    sb.append("portal.properties by the property ");
857                                    sb.append("\"portal.instance.protocol\"");
858    
859                                    throw new Exception(sb.toString());
860                            }
861    
862                            InetAddress inetAddress = clusterNode.getInetAddress();
863    
864                            String fileName = PortalUtil.getPathContext();
865    
866                            if (!fileName.endsWith(StringPool.SLASH)) {
867                                    fileName = fileName.concat(StringPool.SLASH);
868                            }
869    
870                            fileName = fileName.concat("lucene/dump");
871    
872                            URL url = new URL(
873                                    protocol, inetAddress.getHostAddress(), port, fileName);
874    
875                            String transientToken = (String)clusterNodeResponse.getResult();
876    
877                            return new ObjectValuePair<String, URL>(transientToken, url);
878                    }
879                    catch (Exception e) {
880                            throw new SystemException(e);
881                    }
882            }
883    
884            private void _handleFutureClusterResponses(
885                    FutureClusterResponses futureClusterResponses,
886                    IndexAccessor indexAccessor, int clusterNodeAddressesCount,
887                    long localLastGeneration) {
888    
889                    BlockingQueue<ClusterNodeResponse> blockingQueue =
890                            futureClusterResponses.getPartialResults();
891    
892                    long companyId = indexAccessor.getCompanyId();
893    
894                    Address bootupAddress = null;
895    
896                    do {
897                            clusterNodeAddressesCount--;
898    
899                            ClusterNodeResponse clusterNodeResponse = null;
900    
901                            try {
902                                    clusterNodeResponse = blockingQueue.poll(
903                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
904                                            TimeUnit.MILLISECONDS);
905                            }
906                            catch (Exception e) {
907                                    _log.error("Unable to get cluster node response", e);
908                            }
909    
910                            if (clusterNodeResponse == null) {
911                                    if (_log.isDebugEnabled()) {
912                                            _log.debug(
913                                                    "Unable to get cluster node response in " +
914                                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
915                                                                    TimeUnit.MILLISECONDS);
916                                    }
917    
918                                    continue;
919                            }
920    
921                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
922    
923                            if (clusterNode.getPort() > 0) {
924                                    try {
925                                            long remoteLastGeneration =
926                                                    (Long)clusterNodeResponse.getResult();
927    
928                                            if (remoteLastGeneration > localLastGeneration) {
929                                                    bootupAddress = clusterNodeResponse.getAddress();
930    
931                                                    break;
932                                            }
933                                    }
934                                    catch (Exception e) {
935                                            if (_log.isDebugEnabled()) {
936                                                    _log.debug(
937                                                            "Suppress exception caused by remote method " +
938                                                                    "invocation",
939                                                            e);
940                                            }
941    
942                                            continue;
943                                    }
944                            }
945                            else {
946                                    if (_log.isDebugEnabled()) {
947                                            _log.debug(
948                                                    "Cluster node " + clusterNode +
949                                                            " has invalid port");
950                                    }
951                            }
952                    }
953                    while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
954    
955                    if (bootupAddress == null) {
956                            return;
957                    }
958    
959                    if (_log.isInfoEnabled()) {
960                            _log.info(
961                                    "Start loading lucene index files from cluster node " +
962                                            bootupAddress);
963                    }
964    
965                    InputStream inputStream = null;
966    
967                    try {
968                            inputStream = getLoadIndexesInputStreamFromCluster(
969                                    companyId, bootupAddress);
970    
971                            indexAccessor.loadIndex(inputStream);
972    
973                            if (_log.isInfoEnabled()) {
974                                    _log.info("Lucene index files loaded successfully");
975                            }
976                    }
977                    catch (Exception e) {
978                            _log.error("Unable to load index for company " + companyId, e);
979                    }
980                    finally {
981                            if (inputStream != null) {
982                                    try {
983                                            inputStream.close();
984                                    }
985                                    catch (IOException ioe) {
986                                            _log.error(
987                                                    "Unable to close input stream for company " +
988                                                            companyId,
989                                                    ioe);
990                                    }
991                            }
992                    }
993            }
994    
995            private void _includeIfUnique(
996                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
997                    Query query, BooleanClause.Occur occur) {
998    
999                    if (query instanceof TermQuery) {
1000                            Set<Term> terms = new HashSet<Term>();
1001    
1002                            TermQuery termQuery = (TermQuery)query;
1003    
1004                            termQuery.extractTerms(terms);
1005    
1006                            float boost = termQuery.getBoost();
1007    
1008                            for (Term term : terms) {
1009                                    String termValue = term.text();
1010    
1011                                    if (like) {
1012                                            termValue = termValue.toLowerCase(queryParser.getLocale());
1013    
1014                                            term = term.createTerm(
1015                                                    StringPool.STAR.concat(termValue).concat(
1016                                                            StringPool.STAR));
1017    
1018                                            query = new WildcardQuery(term);
1019                                    }
1020                                    else {
1021                                            query = new TermQuery(term);
1022                                    }
1023    
1024                                    query.setBoost(boost);
1025    
1026                                    boolean included = false;
1027    
1028                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1029                                            if (query.equals(booleanClause.getQuery())) {
1030                                                    included = true;
1031                                            }
1032                                    }
1033    
1034                                    if (!included) {
1035                                            booleanQuery.add(query, occur);
1036                                    }
1037                            }
1038                    }
1039                    else if (query instanceof BooleanQuery) {
1040                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
1041    
1042                            BooleanQuery containerBooleanQuery = new BooleanQuery();
1043    
1044                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1045                                    _includeIfUnique(
1046                                            containerBooleanQuery, like, queryParser,
1047                                            booleanClause.getQuery(), booleanClause.getOccur());
1048                            }
1049    
1050                            if (containerBooleanQuery.getClauses().length > 0) {
1051                                    booleanQuery.add(containerBooleanQuery, occur);
1052                            }
1053                    }
1054                    else {
1055                            boolean included = false;
1056    
1057                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1058                                    if (query.equals(booleanClause.getQuery())) {
1059                                            included = true;
1060                                    }
1061                            }
1062    
1063                            if (!included) {
1064                                    booleanQuery.add(query, occur);
1065                            }
1066                    }
1067            }
1068    
1069            private void _loadIndexFromCluster(
1070                            IndexAccessor indexAccessor, long localLastGeneration)
1071                    throws SystemException {
1072    
1073                    List<Address> clusterNodeAddresses =
1074                            ClusterExecutorUtil.getClusterNodeAddresses();
1075    
1076                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
1077    
1078                    if (clusterNodeAddressesCount <= 1) {
1079                            if (_log.isDebugEnabled()) {
1080                                    _log.debug(
1081                                            "Do not load indexes because there is either one portal " +
1082                                                    "instance or no portal instances in the cluster");
1083                            }
1084    
1085                            return;
1086                    }
1087    
1088                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1089                            new MethodHandler(
1090                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1091                            true);
1092    
1093                    FutureClusterResponses futureClusterResponses =
1094                            ClusterExecutorUtil.execute(clusterRequest);
1095    
1096                    _handleFutureClusterResponses(
1097                            futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1098                            localLastGeneration);
1099            }
1100    
1101            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1102                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1103    
1104            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1105                    GetterUtil.getInteger(
1106                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1107                            BooleanQuery.getMaxClauseCount());
1108    
1109            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1110    
1111            private static MethodKey _createTokenMethodKey = new MethodKey(
1112                    TransientTokenUtil.class, "createToken", long.class);
1113            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1114                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1115    
1116            private Analyzer _analyzer;
1117            private Map<Long, IndexAccessor> _indexAccessors =
1118                    new ConcurrentHashMap<Long, IndexAccessor>();
1119            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1120            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1121            private Version _version;
1122    
1123            private static class ShutdownSyncJob implements Runnable {
1124    
1125                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1126                            _countDownLatch = countDownLatch;
1127                    }
1128    
1129                    @Override
1130                    public void run() {
1131                            _countDownLatch.countDown();
1132    
1133                            try {
1134                                    synchronized (this) {
1135                                            wait();
1136                                    }
1137                            }
1138                            catch (InterruptedException ie) {
1139                            }
1140                    }
1141    
1142                    private final CountDownLatch _countDownLatch;
1143    
1144            }
1145    
1146            private class LoadIndexClusterEventListener
1147                    implements ClusterEventListener {
1148    
1149                    @Override
1150                    public void processClusterEvent(ClusterEvent clusterEvent) {
1151                            ClusterEventType clusterEventType =
1152                                    clusterEvent.getClusterEventType();
1153    
1154                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1155                                    return;
1156                            }
1157    
1158                            List<Address> clusterNodeAddresses =
1159                                    ClusterExecutorUtil.getClusterNodeAddresses();
1160                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1161    
1162                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1163                                    if (_log.isDebugEnabled()) {
1164                                            _log.debug(
1165                                                    "Number of original cluster members is greater than " +
1166                                                            "one");
1167                                    }
1168    
1169                                    return;
1170                            }
1171    
1172                            long[] companyIds = PortalInstances.getCompanyIds();
1173    
1174                            for (long companyId : companyIds) {
1175                                    loadIndexes(companyId);
1176                            }
1177    
1178                            loadIndexes(CompanyConstants.SYSTEM);
1179                    }
1180    
1181                    private void loadIndexes(long companyId) {
1182                            long lastGeneration = getLastGeneration(companyId);
1183    
1184                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1185                                    return;
1186                            }
1187    
1188                            try {
1189                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
1190                            }
1191                            catch (Exception e) {
1192                                    _log.error(
1193                                            "Unable to load indexes for company " + companyId, e);
1194                            }
1195                    }
1196    
1197            }
1198    
1199    }