001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLink;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.Destination;
036    import com.liferay.portal.kernel.messaging.MessageBus;
037    import com.liferay.portal.kernel.messaging.MessageBusUtil;
038    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039    import com.liferay.portal.kernel.search.BooleanClauseOccur;
040    import com.liferay.portal.kernel.search.Field;
041    import com.liferay.portal.kernel.search.SearchEngineUtil;
042    import com.liferay.portal.kernel.util.ArrayUtil;
043    import com.liferay.portal.kernel.util.GetterUtil;
044    import com.liferay.portal.kernel.util.Http;
045    import com.liferay.portal.kernel.util.MethodHandler;
046    import com.liferay.portal.kernel.util.MethodKey;
047    import com.liferay.portal.kernel.util.ObjectValuePair;
048    import com.liferay.portal.kernel.util.PropsKeys;
049    import com.liferay.portal.kernel.util.StringBundler;
050    import com.liferay.portal.kernel.util.StringPool;
051    import com.liferay.portal.kernel.util.StringUtil;
052    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053    import com.liferay.portal.kernel.util.Validator;
054    import com.liferay.portal.model.CompanyConstants;
055    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
056    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
057    import com.liferay.portal.security.auth.TransientTokenUtil;
058    import com.liferay.portal.util.PortalInstances;
059    import com.liferay.portal.util.PortalUtil;
060    import com.liferay.portal.util.PropsUtil;
061    import com.liferay.portal.util.PropsValues;
062    
063    import java.io.IOException;
064    import java.io.InputStream;
065    import java.io.OutputStream;
066    
067    import java.net.InetAddress;
068    import java.net.URL;
069    import java.net.URLConnection;
070    
071    import java.util.HashSet;
072    import java.util.List;
073    import java.util.Map;
074    import java.util.Set;
075    import java.util.concurrent.BlockingQueue;
076    import java.util.concurrent.ConcurrentHashMap;
077    import java.util.concurrent.CountDownLatch;
078    import java.util.concurrent.TimeUnit;
079    import java.util.concurrent.TimeoutException;
080    
081    import org.apache.commons.lang.time.StopWatch;
082    import org.apache.lucene.analysis.Analyzer;
083    import org.apache.lucene.analysis.TokenStream;
084    import org.apache.lucene.document.Document;
085    import org.apache.lucene.index.IndexReader;
086    import org.apache.lucene.index.Term;
087    import org.apache.lucene.queryParser.QueryParser;
088    import org.apache.lucene.search.BooleanClause;
089    import org.apache.lucene.search.BooleanQuery;
090    import org.apache.lucene.search.IndexSearcher;
091    import org.apache.lucene.search.NumericRangeQuery;
092    import org.apache.lucene.search.Query;
093    import org.apache.lucene.search.TermQuery;
094    import org.apache.lucene.search.TermRangeQuery;
095    import org.apache.lucene.search.WildcardQuery;
096    import org.apache.lucene.search.highlight.Formatter;
097    import org.apache.lucene.search.highlight.Highlighter;
098    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
099    import org.apache.lucene.search.highlight.QueryScorer;
100    import org.apache.lucene.search.highlight.SimpleFragmenter;
101    import org.apache.lucene.search.highlight.WeightedTerm;
102    import org.apache.lucene.util.Version;
103    
104    /**
105     * @author Brian Wing Shun Chan
106     * @author Harry Mark
107     * @author Bruno Farache
108     * @author Shuyang Zhou
109     * @author Tina Tian
110     * @author Hugo Huijser
111     * @author Andrea Di Giorgi
112     */
113    public class LuceneHelperImpl implements LuceneHelper {
114    
115            @Override
116            public void addDocument(long companyId, Document document)
117                    throws IOException {
118    
119                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
120    
121                    indexAccessor.addDocument(document);
122            }
123    
124            @Override
125            public void addExactTerm(
126                    BooleanQuery booleanQuery, String field, String value) {
127    
128                    addTerm(booleanQuery, field, value, false);
129            }
130    
131            @Override
132            public void addNumericRangeTerm(
133                    BooleanQuery booleanQuery, String field, Integer startValue,
134                    Integer endValue) {
135    
136                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
137                            field, startValue, endValue, true, true);
138    
139                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
140            }
141    
142            @Override
143            public void addNumericRangeTerm(
144                    BooleanQuery booleanQuery, String field, Long startValue,
145                    Long endValue) {
146    
147                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
148                            field, startValue, endValue, true, true);
149    
150                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
151            }
152    
153            /**
154             * @deprecated As of 6.2.0, replaced by {@link
155             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
156             */
157            @Override
158            public void addNumericRangeTerm(
159                    BooleanQuery booleanQuery, String field, String startValue,
160                    String endValue) {
161    
162                    addNumericRangeTerm(
163                            booleanQuery, field, GetterUtil.getLong(startValue),
164                            GetterUtil.getLong(endValue));
165            }
166    
167            @Override
168            public void addRangeTerm(
169                    BooleanQuery booleanQuery, String field, String startValue,
170                    String endValue) {
171    
172                    boolean includesLower = true;
173    
174                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
175                            includesLower = false;
176                    }
177    
178                    boolean includesUpper = true;
179    
180                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
181                            includesUpper = false;
182                    }
183    
184                    TermRangeQuery termRangeQuery = new TermRangeQuery(
185                            field, startValue, endValue, includesLower, includesUpper);
186    
187                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
188            }
189    
190            @Override
191            public void addRequiredTerm(
192                    BooleanQuery booleanQuery, String field, String value, boolean like) {
193    
194                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
195            }
196    
197            @Override
198            public void addRequiredTerm(
199                    BooleanQuery booleanQuery, String field, String[] values,
200                    boolean like) {
201    
202                    if (values == null) {
203                            return;
204                    }
205    
206                    BooleanQuery query = new BooleanQuery();
207    
208                    for (String value : values) {
209                            addTerm(query, field, value, like);
210                    }
211    
212                    booleanQuery.add(query, BooleanClause.Occur.MUST);
213            }
214    
215            @Override
216            public void addTerm(
217                    BooleanQuery booleanQuery, String field, String value, boolean like) {
218    
219                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
220            }
221    
222            @Override
223            public void addTerm(
224                    BooleanQuery booleanQuery, String field, String value, boolean like,
225                    BooleanClauseOccur booleanClauseOccur) {
226    
227                    if (Validator.isNull(value)) {
228                            return;
229                    }
230    
231                    Analyzer analyzer = getAnalyzer();
232    
233                    if (analyzer instanceof PerFieldAnalyzer) {
234                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
235    
236                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
237    
238                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
239                                    like = true;
240                            }
241                    }
242    
243                    if (like) {
244                            value = StringUtil.replace(
245                                    value, StringPool.PERCENT, StringPool.BLANK);
246                    }
247    
248                    try {
249                            QueryParser queryParser = new QueryParser(
250                                    getVersion(), field, analyzer);
251    
252                            Query query = queryParser.parse(value);
253    
254                            BooleanClause.Occur occur = null;
255    
256                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
257                                    occur = BooleanClause.Occur.MUST;
258                            }
259                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
260                                    occur = BooleanClause.Occur.MUST_NOT;
261                            }
262                            else {
263                                    occur = BooleanClause.Occur.SHOULD;
264                            }
265    
266                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
267                    }
268                    catch (Exception e) {
269                            if (_log.isWarnEnabled()) {
270                                    _log.warn(e, e);
271                            }
272                    }
273            }
274    
275            @Override
276            public void addTerm(
277                    BooleanQuery booleanQuery, String field, String[] values,
278                    boolean like) {
279    
280                    for (String value : values) {
281                            addTerm(booleanQuery, field, value, like);
282                    }
283            }
284    
285            /**
286             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
287             *             IndexSearcher)}
288             */
289            @Deprecated
290            @Override
291            public void cleanUp(IndexSearcher indexSearcher) {
292                    if (indexSearcher == null) {
293                            return;
294                    }
295    
296                    try {
297                            indexSearcher.close();
298    
299                            IndexReader indexReader = indexSearcher.getIndexReader();
300    
301                            if (indexReader != null) {
302                                    indexReader.close();
303                            }
304                    }
305                    catch (IOException ioe) {
306                            _log.error(ioe, ioe);
307                    }
308            }
309    
310            @Override
311            public int countScoredFieldNames(Query query, String[] filedNames) {
312                    int count = 0;
313    
314                    for (String fieldName : filedNames) {
315                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
316                                    query, false, fieldName);
317    
318                            if ((weightedTerms.length > 0) &&
319                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
320    
321                                    count++;
322                            }
323                    }
324    
325                    return count;
326            }
327    
328            @Override
329            public void delete(long companyId) {
330                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
331    
332                    if (indexAccessor == null) {
333                            return;
334                    }
335    
336                    indexAccessor.delete();
337            }
338    
339            @Override
340            public void deleteDocuments(long companyId, Term term) throws IOException {
341                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
342    
343                    if (indexAccessor == null) {
344                            return;
345                    }
346    
347                    indexAccessor.deleteDocuments(term);
348            }
349    
350            @Override
351            public void dumpIndex(long companyId, OutputStream outputStream)
352                    throws IOException {
353    
354                    long lastGeneration = getLastGeneration(companyId);
355    
356                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
357                            if (_log.isDebugEnabled()) {
358                                    _log.debug(
359                                            "Dump index from cluster is not enabled for " + companyId);
360                            }
361    
362                            return;
363                    }
364    
365                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
366    
367                    if (indexAccessor == null) {
368                            return;
369                    }
370    
371                    indexAccessor.dumpIndex(outputStream);
372            }
373    
374            @Override
375            public Analyzer getAnalyzer() {
376                    return _analyzer;
377            }
378    
379            @Override
380            public IndexAccessor getIndexAccessor(long companyId) {
381                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
382    
383                    if (indexAccessor != null) {
384                            return indexAccessor;
385                    }
386    
387                    synchronized (this) {
388                            indexAccessor = _indexAccessors.get(companyId);
389    
390                            if (indexAccessor == null) {
391                                    indexAccessor = new IndexAccessorImpl(companyId);
392    
393                                    if (isLoadIndexFromClusterEnabled()) {
394                                            indexAccessor = new SynchronizedIndexAccessorImpl(
395                                                    indexAccessor);
396    
397                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
398                                                    MessageValuesThreadLocal.getValue(
399                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
400    
401                                            if (clusterForwardMessage) {
402                                                    if (_log.isInfoEnabled()) {
403                                                            _log.info(
404                                                                    "Skip Luncene index files cluster loading " +
405                                                                            "since this is a manual reindex request");
406                                                    }
407                                            }
408                                            else {
409                                                    try {
410                                                            _loadIndexFromCluster(
411                                                                    indexAccessor,
412                                                                    indexAccessor.getLastGeneration());
413                                                    }
414                                                    catch (Exception e) {
415                                                            _log.error(
416                                                                    "Unable to load index for company " +
417                                                                            indexAccessor.getCompanyId(),
418                                                                    e);
419                                                    }
420                                            }
421                                    }
422    
423                                    _indexAccessors.put(companyId, indexAccessor);
424                            }
425                    }
426    
427                    return indexAccessor;
428            }
429    
430            @Override
431            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
432                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
433    
434                    return indexAccessor.acquireIndexSearcher();
435            }
436    
437            @Override
438            public long getLastGeneration(long companyId) {
439                    if (!isLoadIndexFromClusterEnabled()) {
440                            return IndexAccessor.DEFAULT_LAST_GENERATION;
441                    }
442    
443                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
444    
445                    if (indexAccessor == null) {
446                            return IndexAccessor.DEFAULT_LAST_GENERATION;
447                    }
448    
449                    return indexAccessor.getLastGeneration();
450            }
451    
452            @Override
453            public InputStream getLoadIndexesInputStreamFromCluster(
454                            long companyId, Address bootupAddress)
455                    throws SystemException {
456    
457                    if (!isLoadIndexFromClusterEnabled()) {
458                            return null;
459                    }
460    
461                    InputStream inputStream = null;
462    
463                    try {
464                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
465                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
466    
467                            URL url = bootupClusterNodeObjectValuePair.getValue();
468    
469                            URLConnection urlConnection = url.openConnection();
470    
471                            urlConnection.setDoOutput(true);
472    
473                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
474                                    urlConnection.getOutputStream());
475    
476                            unsyncPrintWriter.write("transientToken=");
477                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
478                            unsyncPrintWriter.write("&companyId=");
479                            unsyncPrintWriter.write(String.valueOf(companyId));
480    
481                            unsyncPrintWriter.close();
482    
483                            inputStream = urlConnection.getInputStream();
484    
485                            return inputStream;
486                    }
487                    catch (IOException ioe) {
488                            throw new SystemException(ioe);
489                    }
490            }
491    
492            @Override
493            public Set<String> getQueryTerms(Query query) {
494                    String queryString = StringUtil.replace(
495                            query.toString(), StringPool.STAR, StringPool.BLANK);
496    
497                    Query tempQuery = null;
498    
499                    try {
500                            QueryParser queryParser = new QueryParser(
501                                    getVersion(), StringPool.BLANK, getAnalyzer());
502    
503                            tempQuery = queryParser.parse(queryString);
504                    }
505                    catch (Exception e) {
506                            if (_log.isWarnEnabled()) {
507                                    _log.warn("Unable to parse " + queryString);
508                            }
509    
510                            tempQuery = query;
511                    }
512    
513                    WeightedTerm[] weightedTerms = null;
514    
515                    for (String fieldName : Field.KEYWORDS) {
516                            weightedTerms = QueryTermExtractor.getTerms(
517                                    tempQuery, false, fieldName);
518    
519                            if (weightedTerms.length > 0) {
520                                    break;
521                            }
522                    }
523    
524                    Set<String> queryTerms = new HashSet<String>();
525    
526                    for (WeightedTerm weightedTerm : weightedTerms) {
527                            queryTerms.add(weightedTerm.getTerm());
528                    }
529    
530                    return queryTerms;
531            }
532    
533            /**
534             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
535             */
536            @Deprecated
537            @Override
538            public IndexSearcher getSearcher(long companyId, boolean readOnly)
539                    throws IOException {
540    
541                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
542    
543                    IndexReader indexReader = IndexReader.open(
544                            indexAccessor.getLuceneDir(), readOnly);
545    
546                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
547    
548                    indexSearcher.setDefaultFieldSortScoring(true, false);
549                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
550    
551                    return indexSearcher;
552            }
553    
554            @Override
555            public String getSnippet(
556                            Query query, String field, String s, int maxNumFragments,
557                            int fragmentLength, String fragmentSuffix, Formatter formatter)
558                    throws IOException {
559    
560                    QueryScorer queryScorer = new QueryScorer(query, field);
561    
562                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
563    
564                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
565    
566                    TokenStream tokenStream = getAnalyzer().tokenStream(
567                            field, new UnsyncStringReader(s));
568    
569                    try {
570                            String snippet = highlighter.getBestFragments(
571                                    tokenStream, s, maxNumFragments, fragmentSuffix);
572    
573                            if (Validator.isNotNull(snippet) &&
574                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
575                                    !s.equals(snippet)) {
576    
577                                    snippet = snippet.concat(fragmentSuffix);
578                            }
579    
580                            return snippet;
581                    }
582                    catch (InvalidTokenOffsetsException itoe) {
583                            throw new IOException(itoe.getMessage());
584                    }
585            }
586    
587            @Override
588            public Version getVersion() {
589                    return _version;
590            }
591    
592            @Override
593            public boolean isLoadIndexFromClusterEnabled() {
594                    if (PropsValues.CLUSTER_LINK_ENABLED &&
595                            PropsValues.LUCENE_REPLICATE_WRITE) {
596    
597                            return true;
598                    }
599    
600                    if (_log.isDebugEnabled()) {
601                            _log.debug("Load index from cluster is not enabled");
602                    }
603    
604                    return false;
605            }
606    
607            @Override
608            public void loadIndex(long companyId, InputStream inputStream)
609                    throws IOException {
610    
611                    if (!isLoadIndexFromClusterEnabled()) {
612                            return;
613                    }
614    
615                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
616    
617                    if (indexAccessor == null) {
618                            if (_log.isInfoEnabled()) {
619                                    _log.info(
620                                            "Skip loading Lucene index files for company " + companyId +
621                                                    " in favor of lazy loading");
622                            }
623    
624                            return;
625                    }
626    
627                    StopWatch stopWatch = new StopWatch();
628    
629                    stopWatch.start();
630    
631                    if (_log.isInfoEnabled()) {
632                            _log.info(
633                                    "Start loading Lucene index files for company " + companyId);
634                    }
635    
636                    indexAccessor.loadIndex(inputStream);
637    
638                    if (_log.isInfoEnabled()) {
639                            _log.info(
640                                    "Finished loading index files for company " + companyId +
641                                            " in " + stopWatch.getTime() + " ms");
642                    }
643            }
644    
645            @Override
646            public void loadIndexesFromCluster(long companyId) throws SystemException {
647                    if (!isLoadIndexFromClusterEnabled()) {
648                            return;
649                    }
650    
651                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
652    
653                    if (indexAccessor == null) {
654                            return;
655                    }
656    
657                    long localLastGeneration = getLastGeneration(companyId);
658    
659                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
660            }
661    
662            @Override
663            public void releaseIndexSearcher(
664                            long companyId, IndexSearcher indexSearcher)
665                    throws IOException {
666    
667                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
668    
669                    indexAccessor.releaseIndexSearcher(indexSearcher);
670            }
671    
672            public void setAnalyzer(Analyzer analyzer) {
673                    _analyzer = analyzer;
674            }
675    
676            public void setVersion(Version version) {
677                    _version = version;
678            }
679    
680            @Override
681            public void shutdown() {
682                    if (_luceneIndexThreadPoolExecutor != null) {
683                            _luceneIndexThreadPoolExecutor.shutdownNow();
684    
685                            try {
686                                    _luceneIndexThreadPoolExecutor.awaitTermination(
687                                            60, TimeUnit.SECONDS);
688                            }
689                            catch (InterruptedException ie) {
690                                    _log.error("Lucene indexer shutdown interrupted", ie);
691                            }
692                    }
693    
694                    if (isLoadIndexFromClusterEnabled()) {
695                            ClusterExecutorUtil.removeClusterEventListener(
696                                    _loadIndexClusterEventListener);
697                    }
698    
699                    MessageBus messageBus = MessageBusUtil.getMessageBus();
700    
701                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
702                            String searchWriterDestinationName =
703                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
704    
705                            Destination searchWriteDestination = messageBus.getDestination(
706                                    searchWriterDestinationName);
707    
708                            if (searchWriteDestination != null) {
709                                    ThreadPoolExecutor threadPoolExecutor =
710                                            PortalExecutorManagerUtil.getPortalExecutor(
711                                                    searchWriterDestinationName);
712    
713                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
714    
715                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
716    
717                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
718                                            countDownLatch);
719    
720                                    for (int i = 0; i < maxPoolSize; i++) {
721                                            threadPoolExecutor.submit(shutdownSyncJob);
722                                    }
723    
724                                    try {
725                                            countDownLatch.await();
726                                    }
727                                    catch (InterruptedException ie) {
728                                            _log.error("Shutdown waiting interrupted", ie);
729                                    }
730    
731                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
732    
733                                    if (_log.isDebugEnabled()) {
734                                            _log.debug(
735                                                    "Cancelled appending indexing jobs: " + runnables);
736                                    }
737    
738                                    searchWriteDestination.close(true);
739                            }
740                    }
741    
742                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
743                            indexAccessor.close();
744                    }
745            }
746    
747            @Override
748            public void shutdown(long companyId) {
749                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
750    
751                    _indexAccessors.remove(indexAccessor);
752    
753                    indexAccessor.close();
754            }
755    
756            @Override
757            public void startup(long companyId) {
758                    if (!PropsValues.INDEX_ON_STARTUP) {
759                            return;
760                    }
761    
762                    if (_log.isInfoEnabled()) {
763                            _log.info("Indexing Lucene on startup");
764                    }
765    
766                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
767    
768                    if (PropsValues.INDEX_WITH_THREAD) {
769                            if (_luceneIndexThreadPoolExecutor == null) {
770    
771                                    // This should never be null except for the case where
772                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
773                                    // PropsValues#INDEX_ON_STARTUP to true.
774    
775                                    _luceneIndexThreadPoolExecutor =
776                                            PortalExecutorManagerUtil.getPortalExecutor(
777                                                    LuceneHelperImpl.class.getName());
778                            }
779    
780                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
781                    }
782                    else {
783                            luceneIndexer.reindex();
784                    }
785            }
786    
787            @Override
788            public void updateDocument(long companyId, Term term, Document document)
789                    throws IOException {
790    
791                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
792    
793                    indexAccessor.updateDocument(term, document);
794            }
795    
796            private LuceneHelperImpl() {
797                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
798                            _luceneIndexThreadPoolExecutor =
799                                    PortalExecutorManagerUtil.getPortalExecutor(
800                                            LuceneHelperImpl.class.getName());
801                    }
802    
803                    if (isLoadIndexFromClusterEnabled()) {
804                            _loadIndexClusterEventListener =
805                                    new LoadIndexClusterEventListener();
806    
807                            ClusterExecutorUtil.addClusterEventListener(
808                                    _loadIndexClusterEventListener);
809                    }
810    
811                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
812    
813                    if (StringUtil.equalsIgnoreCase(
814                                    Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL)) {
815    
816                            _protocol = Http.HTTPS;
817                    }
818                    else {
819                            _protocol = Http.HTTP;
820                    }
821            }
822    
823            private ObjectValuePair<String, URL>
824                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
825                    throws SystemException {
826    
827                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
828                            new MethodHandler(
829                                    _createTokenMethodKey,
830                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
831                            bootupAddress);
832    
833                    FutureClusterResponses futureClusterResponses =
834                            ClusterExecutorUtil.execute(clusterRequest);
835    
836                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
837                            futureClusterResponses.getPartialResults();
838    
839                    try {
840                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
841                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
842                                    TimeUnit.MILLISECONDS);
843    
844                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
845    
846                            int port = clusterNode.getPort();
847    
848                            if (port <= 0) {
849                                    StringBundler sb = new StringBundler(6);
850    
851                                    sb.append("Invalid cluster node port ");
852                                    sb.append(port);
853                                    sb.append(". The port is set by the first request or ");
854                                    sb.append("configured in portal.properties by the properties ");
855                                    sb.append("\"portal.instance.http.port\" and ");
856                                    sb.append("\"portal.instance.https.port\".");
857    
858                                    throw new Exception(sb.toString());
859                            }
860    
861                            InetAddress inetAddress = clusterNode.getInetAddress();
862    
863                            String fileName = PortalUtil.getPathContext();
864    
865                            if (!fileName.endsWith(StringPool.SLASH)) {
866                                    fileName = fileName.concat(StringPool.SLASH);
867                            }
868    
869                            fileName = fileName.concat("lucene/dump");
870    
871                            URL url = new URL(
872                                    _protocol, inetAddress.getHostAddress(), port, fileName);
873    
874                            String transientToken = (String)clusterNodeResponse.getResult();
875    
876                            return new ObjectValuePair<String, URL>(transientToken, url);
877                    }
878                    catch (Exception e) {
879                            throw new SystemException(e);
880                    }
881            }
882    
883            private void _includeIfUnique(
884                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
885                    Query query, BooleanClause.Occur occur) {
886    
887                    if (query instanceof TermQuery) {
888                            Set<Term> terms = new HashSet<Term>();
889    
890                            TermQuery termQuery = (TermQuery)query;
891    
892                            termQuery.extractTerms(terms);
893    
894                            float boost = termQuery.getBoost();
895    
896                            for (Term term : terms) {
897                                    String termValue = term.text();
898    
899                                    if (like) {
900                                            termValue = termValue.toLowerCase(queryParser.getLocale());
901    
902                                            term = term.createTerm(
903                                                    StringPool.STAR.concat(termValue).concat(
904                                                            StringPool.STAR));
905    
906                                            query = new WildcardQuery(term);
907                                    }
908                                    else {
909                                            query = new TermQuery(term);
910                                    }
911    
912                                    query.setBoost(boost);
913    
914                                    boolean included = false;
915    
916                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
917                                            if (query.equals(booleanClause.getQuery())) {
918                                                    included = true;
919                                            }
920                                    }
921    
922                                    if (!included) {
923                                            booleanQuery.add(query, occur);
924                                    }
925                            }
926                    }
927                    else if (query instanceof BooleanQuery) {
928                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
929    
930                            BooleanQuery containerBooleanQuery = new BooleanQuery();
931    
932                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
933                                    _includeIfUnique(
934                                            containerBooleanQuery, like, queryParser,
935                                            booleanClause.getQuery(), booleanClause.getOccur());
936                            }
937    
938                            if (containerBooleanQuery.getClauses().length > 0) {
939                                    booleanQuery.add(containerBooleanQuery, occur);
940                            }
941                    }
942                    else {
943                            boolean included = false;
944    
945                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
946                                    if (query.equals(booleanClause.getQuery())) {
947                                            included = true;
948                                    }
949                            }
950    
951                            if (!included) {
952                                    booleanQuery.add(query, occur);
953                            }
954                    }
955            }
956    
957            private void _loadIndexFromCluster(
958                            IndexAccessor indexAccessor, long localLastGeneration)
959                    throws SystemException {
960    
961                    List<Address> clusterNodeAddresses =
962                            ClusterExecutorUtil.getClusterNodeAddresses();
963    
964                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
965    
966                    if (clusterNodeAddressesCount <= 1) {
967                            if (_log.isDebugEnabled()) {
968                                    _log.debug(
969                                            "Do not load indexes because there is either one portal " +
970                                                    "instance or no portal instances in the cluster");
971                            }
972    
973                            return;
974                    }
975    
976                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
977                            new MethodHandler(
978                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
979                            true);
980    
981                    ClusterExecutorUtil.execute(
982                            clusterRequest,
983                            new LoadIndexClusterResponseCallback(
984                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
985            }
986    
987            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
988                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
989    
990            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
991                    GetterUtil.getInteger(
992                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
993                            BooleanQuery.getMaxClauseCount());
994    
995            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
996    
997            private static MethodKey _createTokenMethodKey = new MethodKey(
998                    TransientTokenUtil.class, "createToken", long.class);
999            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1000                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1001    
1002            private Analyzer _analyzer;
1003            private Map<Long, IndexAccessor> _indexAccessors =
1004                    new ConcurrentHashMap<Long, IndexAccessor>();
1005            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1006            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1007            private String _protocol;
1008            private Version _version;
1009    
1010            private static class ShutdownSyncJob implements Runnable {
1011    
1012                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1013                            _countDownLatch = countDownLatch;
1014                    }
1015    
1016                    @Override
1017                    public void run() {
1018                            _countDownLatch.countDown();
1019    
1020                            try {
1021                                    synchronized (this) {
1022                                            wait();
1023                                    }
1024                            }
1025                            catch (InterruptedException ie) {
1026                            }
1027                    }
1028    
1029                    private final CountDownLatch _countDownLatch;
1030    
1031            }
1032    
1033            private class LoadIndexClusterEventListener
1034                    implements ClusterEventListener {
1035    
1036                    @Override
1037                    public void processClusterEvent(ClusterEvent clusterEvent) {
1038                            ClusterEventType clusterEventType =
1039                                    clusterEvent.getClusterEventType();
1040    
1041                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1042                                    return;
1043                            }
1044    
1045                            List<Address> clusterNodeAddresses =
1046                                    ClusterExecutorUtil.getClusterNodeAddresses();
1047                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1048    
1049                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1050                                    if (_log.isDebugEnabled()) {
1051                                            _log.debug(
1052                                                    "Number of original cluster members is greater than " +
1053                                                            "one");
1054                                    }
1055    
1056                                    return;
1057                            }
1058    
1059                            long[] companyIds = PortalInstances.getCompanyIds();
1060    
1061                            for (long companyId : companyIds) {
1062                                    loadIndexes(companyId);
1063                            }
1064    
1065                            loadIndexes(CompanyConstants.SYSTEM);
1066                    }
1067    
1068                    private void loadIndexes(long companyId) {
1069                            long lastGeneration = getLastGeneration(companyId);
1070    
1071                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1072                                    return;
1073                            }
1074    
1075                            try {
1076                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
1077                            }
1078                            catch (Exception e) {
1079                                    _log.error(
1080                                            "Unable to load indexes for company " + companyId, e);
1081                            }
1082                    }
1083    
1084            }
1085    
1086            private class LoadIndexClusterResponseCallback
1087                    extends BaseClusterResponseCallback {
1088    
1089                    public LoadIndexClusterResponseCallback(
1090                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1091                            long localLastGeneration) {
1092    
1093                            _indexAccessor = indexAccessor;
1094                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
1095                            _localLastGeneration = localLastGeneration;
1096    
1097                            _companyId = _indexAccessor.getCompanyId();
1098                    }
1099    
1100                    @Override
1101                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1102                            Address bootupAddress = null;
1103    
1104                            do {
1105                                    _clusterNodeAddressesCount--;
1106    
1107                                    ClusterNodeResponse clusterNodeResponse = null;
1108    
1109                                    try {
1110                                            clusterNodeResponse = blockingQueue.poll(
1111                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1112                                                    TimeUnit.MILLISECONDS);
1113                                    }
1114                                    catch (Exception e) {
1115                                            _log.error("Unable to get cluster node response", e);
1116                                    }
1117    
1118                                    if (clusterNodeResponse == null) {
1119                                            if (_log.isDebugEnabled()) {
1120                                                    _log.debug(
1121                                                            "Unable to get cluster node response in " +
1122                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1123                                                                            TimeUnit.MILLISECONDS);
1124                                            }
1125    
1126                                            continue;
1127                                    }
1128    
1129                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1130    
1131                                    if (clusterNode.getPort() > 0) {
1132                                            try {
1133                                                    long remoteLastGeneration =
1134                                                            (Long)clusterNodeResponse.getResult();
1135    
1136                                                    if (remoteLastGeneration > _localLastGeneration) {
1137                                                            bootupAddress = clusterNodeResponse.getAddress();
1138    
1139                                                            break;
1140                                                    }
1141                                            }
1142                                            catch (Exception e) {
1143                                                    if (_log.isDebugEnabled()) {
1144                                                            _log.debug(
1145                                                                    "Suppress exception caused by remote method " +
1146                                                                            "invocation",
1147                                                                    e);
1148                                                    }
1149    
1150                                                    continue;
1151                                            }
1152                                    }
1153                                    else {
1154                                            if (_log.isDebugEnabled()) {
1155                                                    _log.debug(
1156                                                            "Cluster node " + clusterNode +
1157                                                                    " has invalid port");
1158                                            }
1159                                    }
1160                            }
1161                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1162    
1163                            if (bootupAddress == null) {
1164                                    return;
1165                            }
1166    
1167                            if (_log.isInfoEnabled()) {
1168                                    _log.info(
1169                                            "Start loading lucene index files from cluster node " +
1170                                                    bootupAddress);
1171                            }
1172    
1173                            InputStream inputStream = null;
1174    
1175                            try {
1176                                    inputStream = getLoadIndexesInputStreamFromCluster(
1177                                            _companyId, bootupAddress);
1178    
1179                                    _indexAccessor.loadIndex(inputStream);
1180    
1181                                    if (_log.isInfoEnabled()) {
1182                                            _log.info("Lucene index files loaded successfully");
1183                                    }
1184                            }
1185                            catch (Exception e) {
1186                                    _log.error("Unable to load index for company " + _companyId, e);
1187                            }
1188                            finally {
1189                                    if (inputStream != null) {
1190                                            try {
1191                                                    inputStream.close();
1192                                            }
1193                                            catch (IOException ioe) {
1194                                                    _log.error(
1195                                                            "Unable to close input stream for company " +
1196                                                                    _companyId,
1197                                                            ioe);
1198                                            }
1199                                    }
1200                            }
1201                    }
1202    
1203                    @Override
1204                    public void processTimeoutException(TimeoutException timeoutException) {
1205                            _log.error(
1206                                    "Unable to load index for company " + _companyId,
1207                                    timeoutException);
1208                    }
1209    
1210                    private int _clusterNodeAddressesCount;
1211                    private long _companyId;
1212                    private IndexAccessor _indexAccessor;
1213                    private long _localLastGeneration;
1214    
1215            }
1216    
1217    }