001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterLink;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
030    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
031    import com.liferay.portal.kernel.exception.SystemException;
032    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
033    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
034    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
035    import com.liferay.portal.kernel.log.Log;
036    import com.liferay.portal.kernel.log.LogFactoryUtil;
037    import com.liferay.portal.kernel.messaging.Destination;
038    import com.liferay.portal.kernel.messaging.MessageBus;
039    import com.liferay.portal.kernel.messaging.MessageBusUtil;
040    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
041    import com.liferay.portal.kernel.search.BooleanClauseOccur;
042    import com.liferay.portal.kernel.search.Field;
043    import com.liferay.portal.kernel.search.SearchEngineUtil;
044    import com.liferay.portal.kernel.util.ArrayUtil;
045    import com.liferay.portal.kernel.util.GetterUtil;
046    import com.liferay.portal.kernel.util.Http;
047    import com.liferay.portal.kernel.util.MethodHandler;
048    import com.liferay.portal.kernel.util.MethodKey;
049    import com.liferay.portal.kernel.util.ObjectValuePair;
050    import com.liferay.portal.kernel.util.PropsKeys;
051    import com.liferay.portal.kernel.util.StringBundler;
052    import com.liferay.portal.kernel.util.StringPool;
053    import com.liferay.portal.kernel.util.StringUtil;
054    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
055    import com.liferay.portal.kernel.util.Validator;
056    import com.liferay.portal.model.CompanyConstants;
057    import com.liferay.portal.search.SearchEngineInitializer;
058    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
059    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
060    import com.liferay.portal.security.auth.TransientTokenUtil;
061    import com.liferay.portal.util.PortalInstances;
062    import com.liferay.portal.util.PortalUtil;
063    import com.liferay.portal.util.PropsUtil;
064    import com.liferay.portal.util.PropsValues;
065    
066    import java.io.IOException;
067    import java.io.InputStream;
068    import java.io.OutputStream;
069    
070    import java.net.InetAddress;
071    import java.net.InetSocketAddress;
072    import java.net.URL;
073    import java.net.URLConnection;
074    
075    import java.util.HashSet;
076    import java.util.List;
077    import java.util.Map;
078    import java.util.Set;
079    import java.util.concurrent.BlockingQueue;
080    import java.util.concurrent.ConcurrentHashMap;
081    import java.util.concurrent.CountDownLatch;
082    import java.util.concurrent.Future;
083    import java.util.concurrent.TimeUnit;
084    
085    import org.apache.commons.lang.time.StopWatch;
086    import org.apache.lucene.analysis.Analyzer;
087    import org.apache.lucene.analysis.TokenStream;
088    import org.apache.lucene.document.Document;
089    import org.apache.lucene.index.IndexReader;
090    import org.apache.lucene.index.Term;
091    import org.apache.lucene.queryParser.QueryParser;
092    import org.apache.lucene.search.BooleanClause;
093    import org.apache.lucene.search.BooleanQuery;
094    import org.apache.lucene.search.IndexSearcher;
095    import org.apache.lucene.search.NumericRangeQuery;
096    import org.apache.lucene.search.Query;
097    import org.apache.lucene.search.TermQuery;
098    import org.apache.lucene.search.TermRangeQuery;
099    import org.apache.lucene.search.WildcardQuery;
100    import org.apache.lucene.search.highlight.Formatter;
101    import org.apache.lucene.search.highlight.Highlighter;
102    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
103    import org.apache.lucene.search.highlight.QueryScorer;
104    import org.apache.lucene.search.highlight.SimpleFragmenter;
105    import org.apache.lucene.search.highlight.WeightedTerm;
106    import org.apache.lucene.util.Version;
107    
108    /**
109     * @author Brian Wing Shun Chan
110     * @author Harry Mark
111     * @author Bruno Farache
112     * @author Shuyang Zhou
113     * @author Tina Tian
114     * @author Hugo Huijser
115     * @author Andrea Di Giorgi
116     */
117    public class LuceneHelperImpl implements LuceneHelper {
118    
119            @Override
120            public void addDocument(long companyId, Document document)
121                    throws IOException {
122    
123                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
124    
125                    indexAccessor.addDocument(document);
126            }
127    
128            @Override
129            public void addExactTerm(
130                    BooleanQuery booleanQuery, String field, String value) {
131    
132                    addTerm(booleanQuery, field, value, false);
133            }
134    
135            @Override
136            public void addNumericRangeTerm(
137                    BooleanQuery booleanQuery, String field, Integer startValue,
138                    Integer endValue) {
139    
140                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
141                            field, startValue, endValue, true, true);
142    
143                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
144            }
145    
146            @Override
147            public void addNumericRangeTerm(
148                    BooleanQuery booleanQuery, String field, Long startValue,
149                    Long endValue) {
150    
151                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
152                            field, startValue, endValue, true, true);
153    
154                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
155            }
156    
157            /**
158             * @deprecated As of 6.2.0, replaced by {@link
159             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
160             */
161            @Deprecated
162            @Override
163            public void addNumericRangeTerm(
164                    BooleanQuery booleanQuery, String field, String startValue,
165                    String endValue) {
166    
167                    addNumericRangeTerm(
168                            booleanQuery, field, GetterUtil.getLong(startValue),
169                            GetterUtil.getLong(endValue));
170            }
171    
172            @Override
173            public void addRangeTerm(
174                    BooleanQuery booleanQuery, String field, String startValue,
175                    String endValue) {
176    
177                    boolean includesLower = true;
178    
179                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
180                            includesLower = false;
181                    }
182    
183                    boolean includesUpper = true;
184    
185                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
186                            includesUpper = false;
187                    }
188    
189                    TermRangeQuery termRangeQuery = new TermRangeQuery(
190                            field, startValue, endValue, includesLower, includesUpper);
191    
192                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
193            }
194    
195            @Override
196            public void addRequiredTerm(
197                    BooleanQuery booleanQuery, String field, String value, boolean like) {
198    
199                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
200            }
201    
202            @Override
203            public void addRequiredTerm(
204                    BooleanQuery booleanQuery, String field, String[] values,
205                    boolean like) {
206    
207                    if (values == null) {
208                            return;
209                    }
210    
211                    BooleanQuery query = new BooleanQuery();
212    
213                    for (String value : values) {
214                            addTerm(query, field, value, like);
215                    }
216    
217                    booleanQuery.add(query, BooleanClause.Occur.MUST);
218            }
219    
220            @Override
221            public void addTerm(
222                    BooleanQuery booleanQuery, String field, String value, boolean like) {
223    
224                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
225            }
226    
227            @Override
228            public void addTerm(
229                    BooleanQuery booleanQuery, String field, String value, boolean like,
230                    BooleanClauseOccur booleanClauseOccur) {
231    
232                    if (Validator.isNull(value)) {
233                            return;
234                    }
235    
236                    Analyzer analyzer = getAnalyzer();
237    
238                    if (analyzer instanceof PerFieldAnalyzer) {
239                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
240    
241                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
242    
243                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
244                                    like = true;
245                            }
246                    }
247    
248                    if (like) {
249                            value = StringUtil.replace(
250                                    value, StringPool.PERCENT, StringPool.BLANK);
251                    }
252    
253                    try {
254                            QueryParser queryParser = new QueryParser(
255                                    getVersion(), field, analyzer);
256    
257                            Query query = queryParser.parse(value);
258    
259                            BooleanClause.Occur occur = null;
260    
261                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
262                                    occur = BooleanClause.Occur.MUST;
263                            }
264                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
265                                    occur = BooleanClause.Occur.MUST_NOT;
266                            }
267                            else {
268                                    occur = BooleanClause.Occur.SHOULD;
269                            }
270    
271                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
272                    }
273                    catch (Exception e) {
274                            if (_log.isWarnEnabled()) {
275                                    _log.warn(e, e);
276                            }
277                    }
278            }
279    
280            @Override
281            public void addTerm(
282                    BooleanQuery booleanQuery, String field, String[] values,
283                    boolean like) {
284    
285                    for (String value : values) {
286                            addTerm(booleanQuery, field, value, like);
287                    }
288            }
289    
290            /**
291             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
292             *             IndexSearcher)}
293             */
294            @Deprecated
295            @Override
296            public void cleanUp(IndexSearcher indexSearcher) {
297                    if (indexSearcher == null) {
298                            return;
299                    }
300    
301                    try {
302                            indexSearcher.close();
303    
304                            IndexReader indexReader = indexSearcher.getIndexReader();
305    
306                            if (indexReader != null) {
307                                    indexReader.close();
308                            }
309                    }
310                    catch (IOException ioe) {
311                            _log.error(ioe, ioe);
312                    }
313            }
314    
315            @Override
316            public int countScoredFieldNames(Query query, String[] filedNames) {
317                    int count = 0;
318    
319                    for (String fieldName : filedNames) {
320                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
321                                    query, false, fieldName);
322    
323                            if ((weightedTerms.length > 0) &&
324                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
325    
326                                    count++;
327                            }
328                    }
329    
330                    return count;
331            }
332    
333            @Override
334            public void delete(long companyId) {
335                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
336    
337                    if (indexAccessor == null) {
338                            return;
339                    }
340    
341                    indexAccessor.delete();
342            }
343    
344            @Override
345            public void deleteDocuments(long companyId, Term term) throws IOException {
346                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
347    
348                    if (indexAccessor == null) {
349                            return;
350                    }
351    
352                    indexAccessor.deleteDocuments(term);
353            }
354    
355            @Override
356            public void dumpIndex(long companyId, OutputStream outputStream)
357                    throws IOException {
358    
359                    long lastGeneration = getLastGeneration(companyId);
360    
361                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
362                            if (_log.isDebugEnabled()) {
363                                    _log.debug(
364                                            "Dump index from cluster is not enabled for " + companyId);
365                            }
366    
367                            return;
368                    }
369    
370                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
371    
372                    if (indexAccessor == null) {
373                            return;
374                    }
375    
376                    indexAccessor.dumpIndex(outputStream);
377            }
378    
379            @Override
380            public Analyzer getAnalyzer() {
381                    return _analyzer;
382            }
383    
384            @Override
385            public IndexAccessor getIndexAccessor(long companyId) {
386                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
387    
388                    if (indexAccessor != null) {
389                            return indexAccessor;
390                    }
391    
392                    synchronized (this) {
393                            indexAccessor = _indexAccessors.get(companyId);
394    
395                            if (indexAccessor == null) {
396                                    indexAccessor = new IndexAccessorImpl(companyId);
397    
398                                    if (isLoadIndexFromClusterEnabled()) {
399                                            indexAccessor = new SynchronizedIndexAccessorImpl(
400                                                    indexAccessor);
401    
402                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
403                                                    MessageValuesThreadLocal.getValue(
404                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
405    
406                                            if (clusterForwardMessage) {
407                                                    if (_log.isInfoEnabled()) {
408                                                            _log.info(
409                                                                    "Skip Luncene index files cluster loading " +
410                                                                            "since this is a manual reindex request");
411                                                    }
412                                            }
413                                            else {
414                                                    try {
415                                                            _loadIndexFromCluster(
416                                                                    indexAccessor,
417                                                                    indexAccessor.getLastGeneration());
418                                                    }
419                                                    catch (Exception e) {
420                                                            _log.error(
421                                                                    "Unable to load index for company " +
422                                                                            indexAccessor.getCompanyId(),
423                                                                    e);
424                                                    }
425                                            }
426                                    }
427    
428                                    _indexAccessors.put(companyId, indexAccessor);
429                            }
430                    }
431    
432                    return indexAccessor;
433            }
434    
435            @Override
436            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
437                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
438    
439                    return indexAccessor.acquireIndexSearcher();
440            }
441    
442            @Override
443            public long getLastGeneration(long companyId) {
444                    if (!isLoadIndexFromClusterEnabled()) {
445                            return IndexAccessor.DEFAULT_LAST_GENERATION;
446                    }
447    
448                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
449    
450                    if (indexAccessor == null) {
451                            return IndexAccessor.DEFAULT_LAST_GENERATION;
452                    }
453    
454                    return indexAccessor.getLastGeneration();
455            }
456    
457            @Override
458            public InputStream getLoadIndexesInputStreamFromCluster(
459                    long companyId, Address bootupAddress) {
460    
461                    if (!isLoadIndexFromClusterEnabled()) {
462                            return null;
463                    }
464    
465                    InputStream inputStream = null;
466    
467                    try {
468                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
469                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
470    
471                            URL url = bootupClusterNodeObjectValuePair.getValue();
472    
473                            URLConnection urlConnection = url.openConnection();
474    
475                            urlConnection.setDoOutput(true);
476    
477                            try (UnsyncPrintWriter unsyncPrintWriter =
478                                            UnsyncPrintWriterPool.borrow(
479                                                    urlConnection.getOutputStream())) {
480    
481                                    unsyncPrintWriter.write("transientToken=");
482                                    unsyncPrintWriter.write(
483                                            bootupClusterNodeObjectValuePair.getKey());
484                                    unsyncPrintWriter.write("&companyId=");
485                                    unsyncPrintWriter.write(String.valueOf(companyId));
486                            }
487    
488                            inputStream = urlConnection.getInputStream();
489    
490                            return inputStream;
491                    }
492                    catch (IOException ioe) {
493                            throw new SystemException(ioe);
494                    }
495            }
496    
497            @Override
498            public Set<String> getQueryTerms(Query query) {
499                    String queryString = StringUtil.replace(
500                            query.toString(), StringPool.STAR, StringPool.BLANK);
501    
502                    Query tempQuery = null;
503    
504                    try {
505                            QueryParser queryParser = new QueryParser(
506                                    getVersion(), StringPool.BLANK, getAnalyzer());
507    
508                            tempQuery = queryParser.parse(queryString);
509                    }
510                    catch (Exception e) {
511                            if (_log.isWarnEnabled()) {
512                                    _log.warn("Unable to parse " + queryString);
513                            }
514    
515                            tempQuery = query;
516                    }
517    
518                    WeightedTerm[] weightedTerms = null;
519    
520                    for (String fieldName : Field.KEYWORDS) {
521                            weightedTerms = QueryTermExtractor.getTerms(
522                                    tempQuery, false, fieldName);
523    
524                            if (weightedTerms.length > 0) {
525                                    break;
526                            }
527                    }
528    
529                    Set<String> queryTerms = new HashSet<String>();
530    
531                    for (WeightedTerm weightedTerm : weightedTerms) {
532                            queryTerms.add(weightedTerm.getTerm());
533                    }
534    
535                    return queryTerms;
536            }
537    
538            /**
539             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
540             */
541            @Deprecated
542            @Override
543            public IndexSearcher getSearcher(long companyId, boolean readOnly)
544                    throws IOException {
545    
546                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
547    
548                    IndexReader indexReader = IndexReader.open(
549                            indexAccessor.getLuceneDir(), readOnly);
550    
551                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
552    
553                    indexSearcher.setDefaultFieldSortScoring(true, false);
554                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
555    
556                    return indexSearcher;
557            }
558    
559            @Override
560            public String getSnippet(
561                            Query query, String field, String s, int maxNumFragments,
562                            int fragmentLength, String fragmentSuffix, Formatter formatter)
563                    throws IOException {
564    
565                    QueryScorer queryScorer = new QueryScorer(query, field);
566    
567                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
568    
569                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
570    
571                    TokenStream tokenStream = getAnalyzer().tokenStream(
572                            field, new UnsyncStringReader(s));
573    
574                    try {
575                            String snippet = highlighter.getBestFragments(
576                                    tokenStream, s, maxNumFragments, fragmentSuffix);
577    
578                            if (Validator.isNotNull(snippet) &&
579                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
580                                    !s.equals(snippet)) {
581    
582                                    snippet = snippet.concat(fragmentSuffix);
583                            }
584    
585                            return snippet;
586                    }
587                    catch (InvalidTokenOffsetsException itoe) {
588                            throw new IOException(itoe);
589                    }
590            }
591    
592            @Override
593            public Version getVersion() {
594                    return _version;
595            }
596    
597            @Override
598            public boolean isLoadIndexFromClusterEnabled() {
599                    if (PropsValues.CLUSTER_LINK_ENABLED &&
600                            PropsValues.LUCENE_REPLICATE_WRITE) {
601    
602                            return true;
603                    }
604    
605                    if (_log.isDebugEnabled()) {
606                            _log.debug("Load index from cluster is not enabled");
607                    }
608    
609                    return false;
610            }
611    
612            @Override
613            public void loadIndex(long companyId, InputStream inputStream)
614                    throws IOException {
615    
616                    if (!isLoadIndexFromClusterEnabled()) {
617                            return;
618                    }
619    
620                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
621    
622                    if (indexAccessor == null) {
623                            if (_log.isInfoEnabled()) {
624                                    _log.info(
625                                            "Skip loading Lucene index files for company " + companyId +
626                                                    " in favor of lazy loading");
627                            }
628    
629                            return;
630                    }
631    
632                    StopWatch stopWatch = new StopWatch();
633    
634                    stopWatch.start();
635    
636                    if (_log.isInfoEnabled()) {
637                            _log.info(
638                                    "Start loading Lucene index files for company " + companyId);
639                    }
640    
641                    indexAccessor.loadIndex(inputStream);
642    
643                    if (_log.isInfoEnabled()) {
644                            _log.info(
645                                    "Finished loading index files for company " + companyId +
646                                            " in " + stopWatch.getTime() + " ms");
647                    }
648            }
649    
650            @Override
651            public void loadIndexesFromCluster(long companyId) {
652                    if (!isLoadIndexFromClusterEnabled()) {
653                            return;
654                    }
655    
656                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
657    
658                    if (indexAccessor == null) {
659                            return;
660                    }
661    
662                    long localLastGeneration = getLastGeneration(companyId);
663    
664                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
665            }
666    
667            @Override
668            public void releaseIndexSearcher(
669                            long companyId, IndexSearcher indexSearcher)
670                    throws IOException {
671    
672                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
673    
674                    indexAccessor.releaseIndexSearcher(indexSearcher);
675            }
676    
677            public void setAnalyzer(Analyzer analyzer) {
678                    _analyzer = analyzer;
679            }
680    
681            public void setVersion(Version version) {
682                    _version = version;
683            }
684    
685            @Override
686            public void shutdown() {
687                    if (_luceneIndexThreadPoolExecutor != null) {
688                            _luceneIndexThreadPoolExecutor.shutdownNow();
689    
690                            try {
691                                    _luceneIndexThreadPoolExecutor.awaitTermination(
692                                            60, TimeUnit.SECONDS);
693                            }
694                            catch (InterruptedException ie) {
695                                    _log.error("Lucene indexer shutdown interrupted", ie);
696                            }
697                    }
698    
699                    if (isLoadIndexFromClusterEnabled()) {
700                            ClusterExecutorUtil.removeClusterEventListener(
701                                    _loadIndexClusterEventListener);
702                    }
703    
704                    MessageBus messageBus = MessageBusUtil.getMessageBus();
705    
706                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
707                            String searchWriterDestinationName =
708                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
709    
710                            Destination searchWriteDestination = messageBus.getDestination(
711                                    searchWriterDestinationName);
712    
713                            if (searchWriteDestination != null) {
714                                    ThreadPoolExecutor threadPoolExecutor =
715                                            PortalExecutorManagerUtil.getPortalExecutor(
716                                                    searchWriterDestinationName);
717    
718                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
719    
720                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
721    
722                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
723                                            countDownLatch);
724    
725                                    for (int i = 0; i < maxPoolSize; i++) {
726                                            threadPoolExecutor.submit(shutdownSyncJob);
727                                    }
728    
729                                    try {
730                                            countDownLatch.await();
731                                    }
732                                    catch (InterruptedException ie) {
733                                            _log.error("Shutdown waiting interrupted", ie);
734                                    }
735    
736                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
737    
738                                    if (_log.isDebugEnabled()) {
739                                            _log.debug(
740                                                    "Cancelled appending indexing jobs: " + runnables);
741                                    }
742    
743                                    searchWriteDestination.close(true);
744                            }
745                    }
746    
747                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
748                            indexAccessor.close();
749                    }
750            }
751    
752            @Override
753            public void shutdown(long companyId) {
754                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
755    
756                    _indexAccessors.remove(companyId);
757    
758                    indexAccessor.close();
759            }
760    
761            @Override
762            public void startup(long companyId) {
763                    if (!PropsValues.INDEX_ON_STARTUP) {
764                            return;
765                    }
766    
767                    if (_log.isInfoEnabled()) {
768                            _log.info("Indexing Lucene on startup");
769                    }
770    
771                    SearchEngineInitializer searchEngineInitializer =
772                            new SearchEngineInitializer(companyId);
773    
774                    if (PropsValues.INDEX_WITH_THREAD) {
775                            if (_luceneIndexThreadPoolExecutor == null) {
776    
777                                    // This should never be null except for the case where
778                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
779                                    // PropsValues#INDEX_ON_STARTUP to true.
780    
781                                    _luceneIndexThreadPoolExecutor =
782                                            PortalExecutorManagerUtil.getPortalExecutor(
783                                                    LuceneHelperImpl.class.getName());
784                            }
785    
786                            _luceneIndexThreadPoolExecutor.execute(searchEngineInitializer);
787                    }
788                    else {
789                            searchEngineInitializer.reindex();
790                    }
791            }
792    
793            @Override
794            public void updateDocument(long companyId, Term term, Document document)
795                    throws IOException {
796    
797                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
798    
799                    indexAccessor.updateDocument(term, document);
800            }
801    
802            private LuceneHelperImpl() {
803                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
804                            _luceneIndexThreadPoolExecutor =
805                                    PortalExecutorManagerUtil.getPortalExecutor(
806                                            LuceneHelperImpl.class.getName());
807                    }
808    
809                    if (isLoadIndexFromClusterEnabled()) {
810                            _loadIndexClusterEventListener =
811                                    new LoadIndexClusterEventListener();
812    
813                            ClusterExecutorUtil.addClusterEventListener(
814                                    _loadIndexClusterEventListener);
815                    }
816    
817                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
818    
819                    if (StringUtil.equalsIgnoreCase(
820                                    Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL)) {
821    
822                            _protocol = Http.HTTPS;
823                    }
824                    else {
825                            _protocol = Http.HTTP;
826                    }
827            }
828    
829            private ObjectValuePair<String, URL>
830                            _getBootupClusterNodeObjectValuePair(Address bootupAddress) {
831    
832                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
833                            new MethodHandler(
834                                    _createTokenMethodKey,
835                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
836                            bootupAddress);
837    
838                    FutureClusterResponses futureClusterResponses =
839                            ClusterExecutorUtil.execute(clusterRequest);
840    
841                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
842                            futureClusterResponses.getPartialResults();
843    
844                    try {
845                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
846                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
847                                    TimeUnit.MILLISECONDS);
848    
849                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
850    
851                            InetSocketAddress inetSocketAddress =
852                                    clusterNode.getPortalInetSocketAddress();
853    
854                            if (inetSocketAddress == null) {
855                                    StringBundler sb = new StringBundler(6);
856    
857                                    sb.append("Invalid cluster node InetSocketAddress ");
858                                    sb.append(". The InetSocketAddress is set by the first ");
859                                    sb.append("request or configured in portal.properties by the");
860                                    sb.append("properties ");
861                                    sb.append("\"portal.instance.http.inet.socket.address\" and ");
862                                    sb.append("\"portal.instance.https.inet.socket.address\".");
863    
864                                    throw new Exception(sb.toString());
865                            }
866    
867                            InetAddress inetAddress = inetSocketAddress.getAddress();
868    
869                            String fileName = PortalUtil.getPathContext();
870    
871                            if (!fileName.endsWith(StringPool.SLASH)) {
872                                    fileName = fileName.concat(StringPool.SLASH);
873                            }
874    
875                            fileName = fileName.concat("lucene/dump");
876    
877                            URL url = new URL(
878                                    _protocol, inetAddress.getHostAddress(),
879                                    inetSocketAddress.getPort(), fileName);
880    
881                            String transientToken = (String)clusterNodeResponse.getResult();
882    
883                            return new ObjectValuePair<String, URL>(transientToken, url);
884                    }
885                    catch (Exception e) {
886                            throw new SystemException(e);
887                    }
888            }
889    
890            private void _includeIfUnique(
891                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
892                    Query query, BooleanClause.Occur occur) {
893    
894                    if (query instanceof TermQuery) {
895                            Set<Term> terms = new HashSet<Term>();
896    
897                            TermQuery termQuery = (TermQuery)query;
898    
899                            termQuery.extractTerms(terms);
900    
901                            float boost = termQuery.getBoost();
902    
903                            for (Term term : terms) {
904                                    String termValue = term.text();
905    
906                                    if (like) {
907                                            termValue = termValue.toLowerCase(queryParser.getLocale());
908    
909                                            term = term.createTerm(
910                                                    StringPool.STAR.concat(termValue).concat(
911                                                            StringPool.STAR));
912    
913                                            query = new WildcardQuery(term);
914                                    }
915                                    else {
916                                            query = new TermQuery(term);
917                                    }
918    
919                                    query.setBoost(boost);
920    
921                                    boolean included = false;
922    
923                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
924                                            if (query.equals(booleanClause.getQuery())) {
925                                                    included = true;
926                                            }
927                                    }
928    
929                                    if (!included) {
930                                            booleanQuery.add(query, occur);
931                                    }
932                            }
933                    }
934                    else if (query instanceof BooleanQuery) {
935                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
936    
937                            BooleanQuery containerBooleanQuery = new BooleanQuery();
938    
939                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
940                                    _includeIfUnique(
941                                            containerBooleanQuery, like, queryParser,
942                                            booleanClause.getQuery(), booleanClause.getOccur());
943                            }
944    
945                            if (containerBooleanQuery.getClauses().length > 0) {
946                                    booleanQuery.add(containerBooleanQuery, occur);
947                            }
948                    }
949                    else {
950                            boolean included = false;
951    
952                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
953                                    if (query.equals(booleanClause.getQuery())) {
954                                            included = true;
955                                    }
956                            }
957    
958                            if (!included) {
959                                    booleanQuery.add(query, occur);
960                            }
961                    }
962            }
963    
964            private void _loadIndexFromCluster(
965                    final IndexAccessor indexAccessor, long localLastGeneration) {
966    
967                    List<Address> clusterNodeAddresses =
968                            ClusterExecutorUtil.getClusterNodeAddresses();
969    
970                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
971    
972                    if (clusterNodeAddressesCount <= 1) {
973                            if (_log.isDebugEnabled()) {
974                                    _log.debug(
975                                            "Do not load indexes because there is either one portal " +
976                                                    "instance or no portal instances in the cluster");
977                            }
978    
979                            return;
980                    }
981    
982                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
983                            new MethodHandler(
984                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
985                            true);
986    
987                    FutureClusterResponses futureClusterResponses =
988                            ClusterExecutorUtil.execute(
989                                    clusterRequest,
990                                    new LoadIndexClusterResponseCallback(
991                                            indexAccessor, clusterNodeAddressesCount,
992                                            localLastGeneration));
993    
994                    futureClusterResponses.addFutureListener(
995                            new BaseFutureListener<ClusterNodeResponses>() {
996    
997                                    @Override
998                                    public void completeWithException(
999                                            Future<ClusterNodeResponses> future, Throwable throwable) {
1000    
1001                                            _log.error(
1002                                                    "Unable to load index for company " +
1003                                                            indexAccessor.getCompanyId(),
1004                                                    throwable);
1005                                    }
1006    
1007                            });
1008            }
1009    
1010            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1011                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1012    
1013            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1014                    GetterUtil.getInteger(
1015                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1016                            BooleanQuery.getMaxClauseCount());
1017    
1018            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1019    
1020            private static MethodKey _createTokenMethodKey = new MethodKey(
1021                    TransientTokenUtil.class, "createToken", long.class);
1022            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1023                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1024    
1025            private Analyzer _analyzer;
1026            private Map<Long, IndexAccessor> _indexAccessors =
1027                    new ConcurrentHashMap<Long, IndexAccessor>();
1028            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1029            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1030            private String _protocol;
1031            private Version _version;
1032    
1033            private static class ShutdownSyncJob implements Runnable {
1034    
1035                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1036                            _countDownLatch = countDownLatch;
1037                    }
1038    
1039                    @Override
1040                    public void run() {
1041                            _countDownLatch.countDown();
1042    
1043                            try {
1044                                    synchronized (this) {
1045                                            wait();
1046                                    }
1047                            }
1048                            catch (InterruptedException ie) {
1049                            }
1050                    }
1051    
1052                    private final CountDownLatch _countDownLatch;
1053    
1054            }
1055    
1056            private class LoadIndexClusterEventListener
1057                    implements ClusterEventListener {
1058    
1059                    @Override
1060                    public void processClusterEvent(ClusterEvent clusterEvent) {
1061                            ClusterEventType clusterEventType =
1062                                    clusterEvent.getClusterEventType();
1063    
1064                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1065                                    return;
1066                            }
1067    
1068                            List<Address> clusterNodeAddresses =
1069                                    ClusterExecutorUtil.getClusterNodeAddresses();
1070                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1071    
1072                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1073                                    if (_log.isDebugEnabled()) {
1074                                            _log.debug(
1075                                                    "Number of original cluster members is greater than " +
1076                                                            "one");
1077                                    }
1078    
1079                                    return;
1080                            }
1081    
1082                            long[] companyIds = PortalInstances.getCompanyIds();
1083    
1084                            for (long companyId : companyIds) {
1085                                    loadIndexes(companyId);
1086                            }
1087    
1088                            loadIndexes(CompanyConstants.SYSTEM);
1089                    }
1090    
1091                    private void loadIndexes(long companyId) {
1092                            long lastGeneration = getLastGeneration(companyId);
1093    
1094                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1095                                    return;
1096                            }
1097    
1098                            try {
1099                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
1100                            }
1101                            catch (Exception e) {
1102                                    _log.error(
1103                                            "Unable to load indexes for company " + companyId, e);
1104                            }
1105                    }
1106    
1107            }
1108    
1109            private class LoadIndexClusterResponseCallback
1110                    implements ClusterResponseCallback {
1111    
1112                    public LoadIndexClusterResponseCallback(
1113                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1114                            long localLastGeneration) {
1115    
1116                            _indexAccessor = indexAccessor;
1117                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
1118                            _localLastGeneration = localLastGeneration;
1119    
1120                            _companyId = _indexAccessor.getCompanyId();
1121                    }
1122    
1123                    @Override
1124                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1125                            Address bootupAddress = null;
1126    
1127                            do {
1128                                    _clusterNodeAddressesCount--;
1129    
1130                                    ClusterNodeResponse clusterNodeResponse = null;
1131    
1132                                    try {
1133                                            clusterNodeResponse = blockingQueue.poll(
1134                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1135                                                    TimeUnit.MILLISECONDS);
1136                                    }
1137                                    catch (Exception e) {
1138                                            _log.error("Unable to get cluster node response", e);
1139                                    }
1140    
1141                                    if (clusterNodeResponse == null) {
1142                                            if (_log.isDebugEnabled()) {
1143                                                    _log.debug(
1144                                                            "Unable to get cluster node response in " +
1145                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1146                                                                            TimeUnit.MILLISECONDS);
1147                                            }
1148    
1149                                            continue;
1150                                    }
1151    
1152                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1153    
1154                                    if (clusterNode.getPortalInetSocketAddress() != null) {
1155                                            try {
1156                                                    long remoteLastGeneration =
1157                                                            (Long)clusterNodeResponse.getResult();
1158    
1159                                                    if (remoteLastGeneration > _localLastGeneration) {
1160                                                            bootupAddress = clusterNodeResponse.getAddress();
1161    
1162                                                            break;
1163                                                    }
1164                                            }
1165                                            catch (Exception e) {
1166                                                    if (_log.isDebugEnabled()) {
1167                                                            _log.debug(
1168                                                                    "Suppress exception caused by remote method " +
1169                                                                            "invocation",
1170                                                                    e);
1171                                                    }
1172    
1173                                                    continue;
1174                                            }
1175                                    }
1176                                    else if (_log.isDebugEnabled()) {
1177                                            _log.debug(
1178                                                    "Cluster node " + clusterNode +
1179                                                            " has invalid InetSocketAddress");
1180                                    }
1181                            }
1182                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1183    
1184                            if (bootupAddress == null) {
1185                                    return;
1186                            }
1187    
1188                            if (_log.isInfoEnabled()) {
1189                                    _log.info(
1190                                            "Start loading lucene index files from cluster node " +
1191                                                    bootupAddress);
1192                            }
1193    
1194                            InputStream inputStream = null;
1195    
1196                            try {
1197                                    inputStream = getLoadIndexesInputStreamFromCluster(
1198                                            _companyId, bootupAddress);
1199    
1200                                    _indexAccessor.loadIndex(inputStream);
1201    
1202                                    if (_log.isInfoEnabled()) {
1203                                            _log.info("Lucene index files loaded successfully");
1204                                    }
1205                            }
1206                            catch (Exception e) {
1207                                    _log.error("Unable to load index for company " + _companyId, e);
1208                            }
1209                            finally {
1210                                    if (inputStream != null) {
1211                                            try {
1212                                                    inputStream.close();
1213                                            }
1214                                            catch (IOException ioe) {
1215                                                    _log.error(
1216                                                            "Unable to close input stream for company " +
1217                                                                    _companyId,
1218                                                            ioe);
1219                                            }
1220                                    }
1221                            }
1222                    }
1223    
1224                    private int _clusterNodeAddressesCount;
1225                    private long _companyId;
1226                    private IndexAccessor _indexAccessor;
1227                    private long _localLastGeneration;
1228    
1229            }
1230    
1231    }