001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLink;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.Destination;
036    import com.liferay.portal.kernel.messaging.MessageBus;
037    import com.liferay.portal.kernel.messaging.MessageBusUtil;
038    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039    import com.liferay.portal.kernel.search.BooleanClauseOccur;
040    import com.liferay.portal.kernel.search.Field;
041    import com.liferay.portal.kernel.search.SearchEngineUtil;
042    import com.liferay.portal.kernel.util.ArrayUtil;
043    import com.liferay.portal.kernel.util.GetterUtil;
044    import com.liferay.portal.kernel.util.Http;
045    import com.liferay.portal.kernel.util.MethodHandler;
046    import com.liferay.portal.kernel.util.MethodKey;
047    import com.liferay.portal.kernel.util.ObjectValuePair;
048    import com.liferay.portal.kernel.util.PropsKeys;
049    import com.liferay.portal.kernel.util.StringBundler;
050    import com.liferay.portal.kernel.util.StringPool;
051    import com.liferay.portal.kernel.util.StringUtil;
052    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
053    import com.liferay.portal.kernel.util.Validator;
054    import com.liferay.portal.model.CompanyConstants;
055    import com.liferay.portal.search.SearchEngineInitializer;
056    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
057    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
058    import com.liferay.portal.security.auth.TransientTokenUtil;
059    import com.liferay.portal.util.PortalInstances;
060    import com.liferay.portal.util.PortalUtil;
061    import com.liferay.portal.util.PropsUtil;
062    import com.liferay.portal.util.PropsValues;
063    
064    import java.io.IOException;
065    import java.io.InputStream;
066    import java.io.OutputStream;
067    
068    import java.net.InetAddress;
069    import java.net.InetSocketAddress;
070    import java.net.URL;
071    import java.net.URLConnection;
072    
073    import java.util.HashSet;
074    import java.util.List;
075    import java.util.Map;
076    import java.util.Set;
077    import java.util.concurrent.BlockingQueue;
078    import java.util.concurrent.ConcurrentHashMap;
079    import java.util.concurrent.CountDownLatch;
080    import java.util.concurrent.TimeUnit;
081    import java.util.concurrent.TimeoutException;
082    
083    import org.apache.commons.lang.time.StopWatch;
084    import org.apache.lucene.analysis.Analyzer;
085    import org.apache.lucene.analysis.TokenStream;
086    import org.apache.lucene.document.Document;
087    import org.apache.lucene.index.IndexReader;
088    import org.apache.lucene.index.Term;
089    import org.apache.lucene.queryParser.QueryParser;
090    import org.apache.lucene.search.BooleanClause;
091    import org.apache.lucene.search.BooleanQuery;
092    import org.apache.lucene.search.IndexSearcher;
093    import org.apache.lucene.search.NumericRangeQuery;
094    import org.apache.lucene.search.Query;
095    import org.apache.lucene.search.TermQuery;
096    import org.apache.lucene.search.TermRangeQuery;
097    import org.apache.lucene.search.WildcardQuery;
098    import org.apache.lucene.search.highlight.Formatter;
099    import org.apache.lucene.search.highlight.Highlighter;
100    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
101    import org.apache.lucene.search.highlight.QueryScorer;
102    import org.apache.lucene.search.highlight.SimpleFragmenter;
103    import org.apache.lucene.search.highlight.WeightedTerm;
104    import org.apache.lucene.util.Version;
105    
106    /**
107     * @author Brian Wing Shun Chan
108     * @author Harry Mark
109     * @author Bruno Farache
110     * @author Shuyang Zhou
111     * @author Tina Tian
112     * @author Hugo Huijser
113     * @author Andrea Di Giorgi
114     */
115    public class LuceneHelperImpl implements LuceneHelper {
116    
117            @Override
118            public void addDocument(long companyId, Document document)
119                    throws IOException {
120    
121                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
122    
123                    indexAccessor.addDocument(document);
124            }
125    
126            @Override
127            public void addExactTerm(
128                    BooleanQuery booleanQuery, String field, String value) {
129    
130                    addTerm(booleanQuery, field, value, false);
131            }
132    
133            @Override
134            public void addNumericRangeTerm(
135                    BooleanQuery booleanQuery, String field, Integer startValue,
136                    Integer endValue) {
137    
138                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
139                            field, startValue, endValue, true, true);
140    
141                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
142            }
143    
144            @Override
145            public void addNumericRangeTerm(
146                    BooleanQuery booleanQuery, String field, Long startValue,
147                    Long endValue) {
148    
149                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
150                            field, startValue, endValue, true, true);
151    
152                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
153            }
154    
155            /**
156             * @deprecated As of 6.2.0, replaced by {@link
157             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
158             */
159            @Deprecated
160            @Override
161            public void addNumericRangeTerm(
162                    BooleanQuery booleanQuery, String field, String startValue,
163                    String endValue) {
164    
165                    addNumericRangeTerm(
166                            booleanQuery, field, GetterUtil.getLong(startValue),
167                            GetterUtil.getLong(endValue));
168            }
169    
170            @Override
171            public void addRangeTerm(
172                    BooleanQuery booleanQuery, String field, String startValue,
173                    String endValue) {
174    
175                    boolean includesLower = true;
176    
177                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
178                            includesLower = false;
179                    }
180    
181                    boolean includesUpper = true;
182    
183                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
184                            includesUpper = false;
185                    }
186    
187                    TermRangeQuery termRangeQuery = new TermRangeQuery(
188                            field, startValue, endValue, includesLower, includesUpper);
189    
190                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
191            }
192    
193            @Override
194            public void addRequiredTerm(
195                    BooleanQuery booleanQuery, String field, String value, boolean like) {
196    
197                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
198            }
199    
200            @Override
201            public void addRequiredTerm(
202                    BooleanQuery booleanQuery, String field, String[] values,
203                    boolean like) {
204    
205                    if (values == null) {
206                            return;
207                    }
208    
209                    BooleanQuery query = new BooleanQuery();
210    
211                    for (String value : values) {
212                            addTerm(query, field, value, like);
213                    }
214    
215                    booleanQuery.add(query, BooleanClause.Occur.MUST);
216            }
217    
218            @Override
219            public void addTerm(
220                    BooleanQuery booleanQuery, String field, String value, boolean like) {
221    
222                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
223            }
224    
225            @Override
226            public void addTerm(
227                    BooleanQuery booleanQuery, String field, String value, boolean like,
228                    BooleanClauseOccur booleanClauseOccur) {
229    
230                    if (Validator.isNull(value)) {
231                            return;
232                    }
233    
234                    Analyzer analyzer = getAnalyzer();
235    
236                    if (analyzer instanceof PerFieldAnalyzer) {
237                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
238    
239                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
240    
241                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
242                                    like = true;
243                            }
244                    }
245    
246                    if (like) {
247                            value = StringUtil.replace(
248                                    value, StringPool.PERCENT, StringPool.BLANK);
249                    }
250    
251                    try {
252                            QueryParser queryParser = new QueryParser(
253                                    getVersion(), field, analyzer);
254    
255                            Query query = queryParser.parse(value);
256    
257                            BooleanClause.Occur occur = null;
258    
259                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
260                                    occur = BooleanClause.Occur.MUST;
261                            }
262                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
263                                    occur = BooleanClause.Occur.MUST_NOT;
264                            }
265                            else {
266                                    occur = BooleanClause.Occur.SHOULD;
267                            }
268    
269                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
270                    }
271                    catch (Exception e) {
272                            if (_log.isWarnEnabled()) {
273                                    _log.warn(e, e);
274                            }
275                    }
276            }
277    
278            @Override
279            public void addTerm(
280                    BooleanQuery booleanQuery, String field, String[] values,
281                    boolean like) {
282    
283                    for (String value : values) {
284                            addTerm(booleanQuery, field, value, like);
285                    }
286            }
287    
288            /**
289             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
290             *             IndexSearcher)}
291             */
292            @Deprecated
293            @Override
294            public void cleanUp(IndexSearcher indexSearcher) {
295                    if (indexSearcher == null) {
296                            return;
297                    }
298    
299                    try {
300                            indexSearcher.close();
301    
302                            IndexReader indexReader = indexSearcher.getIndexReader();
303    
304                            if (indexReader != null) {
305                                    indexReader.close();
306                            }
307                    }
308                    catch (IOException ioe) {
309                            _log.error(ioe, ioe);
310                    }
311            }
312    
313            @Override
314            public int countScoredFieldNames(Query query, String[] filedNames) {
315                    int count = 0;
316    
317                    for (String fieldName : filedNames) {
318                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
319                                    query, false, fieldName);
320    
321                            if ((weightedTerms.length > 0) &&
322                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
323    
324                                    count++;
325                            }
326                    }
327    
328                    return count;
329            }
330    
331            @Override
332            public void delete(long companyId) {
333                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
334    
335                    if (indexAccessor == null) {
336                            return;
337                    }
338    
339                    indexAccessor.delete();
340            }
341    
342            @Override
343            public void deleteDocuments(long companyId, Term term) throws IOException {
344                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
345    
346                    if (indexAccessor == null) {
347                            return;
348                    }
349    
350                    indexAccessor.deleteDocuments(term);
351            }
352    
353            @Override
354            public void dumpIndex(long companyId, OutputStream outputStream)
355                    throws IOException {
356    
357                    long lastGeneration = getLastGeneration(companyId);
358    
359                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
360                            if (_log.isDebugEnabled()) {
361                                    _log.debug(
362                                            "Dump index from cluster is not enabled for " + companyId);
363                            }
364    
365                            return;
366                    }
367    
368                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
369    
370                    if (indexAccessor == null) {
371                            return;
372                    }
373    
374                    indexAccessor.dumpIndex(outputStream);
375            }
376    
377            @Override
378            public Analyzer getAnalyzer() {
379                    return _analyzer;
380            }
381    
382            @Override
383            public IndexAccessor getIndexAccessor(long companyId) {
384                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
385    
386                    if (indexAccessor != null) {
387                            return indexAccessor;
388                    }
389    
390                    synchronized (this) {
391                            indexAccessor = _indexAccessors.get(companyId);
392    
393                            if (indexAccessor == null) {
394                                    indexAccessor = new IndexAccessorImpl(companyId);
395    
396                                    if (isLoadIndexFromClusterEnabled()) {
397                                            indexAccessor = new SynchronizedIndexAccessorImpl(
398                                                    indexAccessor);
399    
400                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
401                                                    MessageValuesThreadLocal.getValue(
402                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
403    
404                                            if (clusterForwardMessage) {
405                                                    if (_log.isInfoEnabled()) {
406                                                            _log.info(
407                                                                    "Skip Luncene index files cluster loading " +
408                                                                            "since this is a manual reindex request");
409                                                    }
410                                            }
411                                            else {
412                                                    try {
413                                                            _loadIndexFromCluster(
414                                                                    indexAccessor,
415                                                                    indexAccessor.getLastGeneration());
416                                                    }
417                                                    catch (Exception e) {
418                                                            _log.error(
419                                                                    "Unable to load index for company " +
420                                                                            indexAccessor.getCompanyId(),
421                                                                    e);
422                                                    }
423                                            }
424                                    }
425    
426                                    _indexAccessors.put(companyId, indexAccessor);
427                            }
428                    }
429    
430                    return indexAccessor;
431            }
432    
433            @Override
434            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
435                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
436    
437                    return indexAccessor.acquireIndexSearcher();
438            }
439    
440            @Override
441            public long getLastGeneration(long companyId) {
442                    if (!isLoadIndexFromClusterEnabled()) {
443                            return IndexAccessor.DEFAULT_LAST_GENERATION;
444                    }
445    
446                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
447    
448                    if (indexAccessor == null) {
449                            return IndexAccessor.DEFAULT_LAST_GENERATION;
450                    }
451    
452                    return indexAccessor.getLastGeneration();
453            }
454    
455            @Override
456            public InputStream getLoadIndexesInputStreamFromCluster(
457                    long companyId, Address bootupAddress) {
458    
459                    if (!isLoadIndexFromClusterEnabled()) {
460                            return null;
461                    }
462    
463                    InputStream inputStream = null;
464    
465                    try {
466                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
467                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
468    
469                            URL url = bootupClusterNodeObjectValuePair.getValue();
470    
471                            URLConnection urlConnection = url.openConnection();
472    
473                            urlConnection.setDoOutput(true);
474    
475                            try (UnsyncPrintWriter unsyncPrintWriter =
476                                            UnsyncPrintWriterPool.borrow(
477                                                    urlConnection.getOutputStream())) {
478    
479                                    unsyncPrintWriter.write("transientToken=");
480                                    unsyncPrintWriter.write(
481                                            bootupClusterNodeObjectValuePair.getKey());
482                                    unsyncPrintWriter.write("&companyId=");
483                                    unsyncPrintWriter.write(String.valueOf(companyId));
484                            }
485    
486                            inputStream = urlConnection.getInputStream();
487    
488                            return inputStream;
489                    }
490                    catch (IOException ioe) {
491                            throw new SystemException(ioe);
492                    }
493            }
494    
495            @Override
496            public Set<String> getQueryTerms(Query query) {
497                    String queryString = StringUtil.replace(
498                            query.toString(), StringPool.STAR, StringPool.BLANK);
499    
500                    Query tempQuery = null;
501    
502                    try {
503                            QueryParser queryParser = new QueryParser(
504                                    getVersion(), StringPool.BLANK, getAnalyzer());
505    
506                            tempQuery = queryParser.parse(queryString);
507                    }
508                    catch (Exception e) {
509                            if (_log.isWarnEnabled()) {
510                                    _log.warn("Unable to parse " + queryString);
511                            }
512    
513                            tempQuery = query;
514                    }
515    
516                    WeightedTerm[] weightedTerms = null;
517    
518                    for (String fieldName : Field.KEYWORDS) {
519                            weightedTerms = QueryTermExtractor.getTerms(
520                                    tempQuery, false, fieldName);
521    
522                            if (weightedTerms.length > 0) {
523                                    break;
524                            }
525                    }
526    
527                    Set<String> queryTerms = new HashSet<String>();
528    
529                    for (WeightedTerm weightedTerm : weightedTerms) {
530                            queryTerms.add(weightedTerm.getTerm());
531                    }
532    
533                    return queryTerms;
534            }
535    
536            /**
537             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
538             */
539            @Deprecated
540            @Override
541            public IndexSearcher getSearcher(long companyId, boolean readOnly)
542                    throws IOException {
543    
544                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
545    
546                    IndexReader indexReader = IndexReader.open(
547                            indexAccessor.getLuceneDir(), readOnly);
548    
549                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
550    
551                    indexSearcher.setDefaultFieldSortScoring(true, false);
552                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
553    
554                    return indexSearcher;
555            }
556    
557            @Override
558            public String getSnippet(
559                            Query query, String field, String s, int maxNumFragments,
560                            int fragmentLength, String fragmentSuffix, Formatter formatter)
561                    throws IOException {
562    
563                    QueryScorer queryScorer = new QueryScorer(query, field);
564    
565                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
566    
567                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
568    
569                    TokenStream tokenStream = getAnalyzer().tokenStream(
570                            field, new UnsyncStringReader(s));
571    
572                    try {
573                            String snippet = highlighter.getBestFragments(
574                                    tokenStream, s, maxNumFragments, fragmentSuffix);
575    
576                            if (Validator.isNotNull(snippet) &&
577                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
578                                    !s.equals(snippet)) {
579    
580                                    snippet = snippet.concat(fragmentSuffix);
581                            }
582    
583                            return snippet;
584                    }
585                    catch (InvalidTokenOffsetsException itoe) {
586                            throw new IOException(itoe);
587                    }
588            }
589    
590            @Override
591            public Version getVersion() {
592                    return _version;
593            }
594    
595            @Override
596            public boolean isLoadIndexFromClusterEnabled() {
597                    if (PropsValues.CLUSTER_LINK_ENABLED &&
598                            PropsValues.LUCENE_REPLICATE_WRITE) {
599    
600                            return true;
601                    }
602    
603                    if (_log.isDebugEnabled()) {
604                            _log.debug("Load index from cluster is not enabled");
605                    }
606    
607                    return false;
608            }
609    
610            @Override
611            public void loadIndex(long companyId, InputStream inputStream)
612                    throws IOException {
613    
614                    if (!isLoadIndexFromClusterEnabled()) {
615                            return;
616                    }
617    
618                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
619    
620                    if (indexAccessor == null) {
621                            if (_log.isInfoEnabled()) {
622                                    _log.info(
623                                            "Skip loading Lucene index files for company " + companyId +
624                                                    " in favor of lazy loading");
625                            }
626    
627                            return;
628                    }
629    
630                    StopWatch stopWatch = new StopWatch();
631    
632                    stopWatch.start();
633    
634                    if (_log.isInfoEnabled()) {
635                            _log.info(
636                                    "Start loading Lucene index files for company " + companyId);
637                    }
638    
639                    indexAccessor.loadIndex(inputStream);
640    
641                    if (_log.isInfoEnabled()) {
642                            _log.info(
643                                    "Finished loading index files for company " + companyId +
644                                            " in " + stopWatch.getTime() + " ms");
645                    }
646            }
647    
648            @Override
649            public void loadIndexesFromCluster(long companyId) {
650                    if (!isLoadIndexFromClusterEnabled()) {
651                            return;
652                    }
653    
654                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
655    
656                    if (indexAccessor == null) {
657                            return;
658                    }
659    
660                    long localLastGeneration = getLastGeneration(companyId);
661    
662                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
663            }
664    
665            @Override
666            public void releaseIndexSearcher(
667                            long companyId, IndexSearcher indexSearcher)
668                    throws IOException {
669    
670                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
671    
672                    indexAccessor.releaseIndexSearcher(indexSearcher);
673            }
674    
675            public void setAnalyzer(Analyzer analyzer) {
676                    _analyzer = analyzer;
677            }
678    
679            public void setVersion(Version version) {
680                    _version = version;
681            }
682    
683            @Override
684            public void shutdown() {
685                    if (_luceneIndexThreadPoolExecutor != null) {
686                            _luceneIndexThreadPoolExecutor.shutdownNow();
687    
688                            try {
689                                    _luceneIndexThreadPoolExecutor.awaitTermination(
690                                            60, TimeUnit.SECONDS);
691                            }
692                            catch (InterruptedException ie) {
693                                    _log.error("Lucene indexer shutdown interrupted", ie);
694                            }
695                    }
696    
697                    if (isLoadIndexFromClusterEnabled()) {
698                            ClusterExecutorUtil.removeClusterEventListener(
699                                    _loadIndexClusterEventListener);
700                    }
701    
702                    MessageBus messageBus = MessageBusUtil.getMessageBus();
703    
704                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
705                            String searchWriterDestinationName =
706                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
707    
708                            Destination searchWriteDestination = messageBus.getDestination(
709                                    searchWriterDestinationName);
710    
711                            if (searchWriteDestination != null) {
712                                    ThreadPoolExecutor threadPoolExecutor =
713                                            PortalExecutorManagerUtil.getPortalExecutor(
714                                                    searchWriterDestinationName);
715    
716                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
717    
718                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
719    
720                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
721                                            countDownLatch);
722    
723                                    for (int i = 0; i < maxPoolSize; i++) {
724                                            threadPoolExecutor.submit(shutdownSyncJob);
725                                    }
726    
727                                    try {
728                                            countDownLatch.await();
729                                    }
730                                    catch (InterruptedException ie) {
731                                            _log.error("Shutdown waiting interrupted", ie);
732                                    }
733    
734                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
735    
736                                    if (_log.isDebugEnabled()) {
737                                            _log.debug(
738                                                    "Cancelled appending indexing jobs: " + runnables);
739                                    }
740    
741                                    searchWriteDestination.close(true);
742                            }
743                    }
744    
745                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
746                            indexAccessor.close();
747                    }
748            }
749    
750            @Override
751            public void shutdown(long companyId) {
752                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
753    
754                    _indexAccessors.remove(companyId);
755    
756                    indexAccessor.close();
757            }
758    
759            @Override
760            public void startup(long companyId) {
761                    if (!PropsValues.INDEX_ON_STARTUP) {
762                            return;
763                    }
764    
765                    if (_log.isInfoEnabled()) {
766                            _log.info("Indexing Lucene on startup");
767                    }
768    
769                    SearchEngineInitializer searchEngineInitializer =
770                            new SearchEngineInitializer(companyId);
771    
772                    if (PropsValues.INDEX_WITH_THREAD) {
773                            if (_luceneIndexThreadPoolExecutor == null) {
774    
775                                    // This should never be null except for the case where
776                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
777                                    // PropsValues#INDEX_ON_STARTUP to true.
778    
779                                    _luceneIndexThreadPoolExecutor =
780                                            PortalExecutorManagerUtil.getPortalExecutor(
781                                                    LuceneHelperImpl.class.getName());
782                            }
783    
784                            _luceneIndexThreadPoolExecutor.execute(searchEngineInitializer);
785                    }
786                    else {
787                            searchEngineInitializer.reindex();
788                    }
789            }
790    
791            @Override
792            public void updateDocument(long companyId, Term term, Document document)
793                    throws IOException {
794    
795                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
796    
797                    indexAccessor.updateDocument(term, document);
798            }
799    
800            private LuceneHelperImpl() {
801                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
802                            _luceneIndexThreadPoolExecutor =
803                                    PortalExecutorManagerUtil.getPortalExecutor(
804                                            LuceneHelperImpl.class.getName());
805                    }
806    
807                    if (isLoadIndexFromClusterEnabled()) {
808                            _loadIndexClusterEventListener =
809                                    new LoadIndexClusterEventListener();
810    
811                            ClusterExecutorUtil.addClusterEventListener(
812                                    _loadIndexClusterEventListener);
813                    }
814    
815                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
816    
817                    if (StringUtil.equalsIgnoreCase(
818                                    Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL)) {
819    
820                            _protocol = Http.HTTPS;
821                    }
822                    else {
823                            _protocol = Http.HTTP;
824                    }
825            }
826    
827            private ObjectValuePair<String, URL>
828                            _getBootupClusterNodeObjectValuePair(Address bootupAddress) {
829    
830                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
831                            new MethodHandler(
832                                    _createTokenMethodKey,
833                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
834                            bootupAddress);
835    
836                    FutureClusterResponses futureClusterResponses =
837                            ClusterExecutorUtil.execute(clusterRequest);
838    
839                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
840                            futureClusterResponses.getPartialResults();
841    
842                    try {
843                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
844                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
845                                    TimeUnit.MILLISECONDS);
846    
847                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
848    
849                            InetSocketAddress inetSocketAddress =
850                                    clusterNode.getPortalInetSocketAddress();
851    
852                            if (inetSocketAddress == null) {
853                                    StringBundler sb = new StringBundler(6);
854    
855                                    sb.append("Invalid cluster node InetSocketAddress ");
856                                    sb.append(". The InetSocketAddress is set by the first ");
857                                    sb.append("request or configured in portal.properties by the");
858                                    sb.append("properties ");
859                                    sb.append("\"portal.instance.http.inet.socket.address\" and ");
860                                    sb.append("\"portal.instance.https.inet.socket.address\".");
861    
862                                    throw new Exception(sb.toString());
863                            }
864    
865                            InetAddress inetAddress = inetSocketAddress.getAddress();
866    
867                            String fileName = PortalUtil.getPathContext();
868    
869                            if (!fileName.endsWith(StringPool.SLASH)) {
870                                    fileName = fileName.concat(StringPool.SLASH);
871                            }
872    
873                            fileName = fileName.concat("lucene/dump");
874    
875                            URL url = new URL(
876                                    _protocol, inetAddress.getHostAddress(),
877                                    inetSocketAddress.getPort(), fileName);
878    
879                            String transientToken = (String)clusterNodeResponse.getResult();
880    
881                            return new ObjectValuePair<String, URL>(transientToken, url);
882                    }
883                    catch (Exception e) {
884                            throw new SystemException(e);
885                    }
886            }
887    
888            private void _includeIfUnique(
889                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
890                    Query query, BooleanClause.Occur occur) {
891    
892                    if (query instanceof TermQuery) {
893                            Set<Term> terms = new HashSet<Term>();
894    
895                            TermQuery termQuery = (TermQuery)query;
896    
897                            termQuery.extractTerms(terms);
898    
899                            float boost = termQuery.getBoost();
900    
901                            for (Term term : terms) {
902                                    String termValue = term.text();
903    
904                                    if (like) {
905                                            termValue = termValue.toLowerCase(queryParser.getLocale());
906    
907                                            term = term.createTerm(
908                                                    StringPool.STAR.concat(termValue).concat(
909                                                            StringPool.STAR));
910    
911                                            query = new WildcardQuery(term);
912                                    }
913                                    else {
914                                            query = new TermQuery(term);
915                                    }
916    
917                                    query.setBoost(boost);
918    
919                                    boolean included = false;
920    
921                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
922                                            if (query.equals(booleanClause.getQuery())) {
923                                                    included = true;
924                                            }
925                                    }
926    
927                                    if (!included) {
928                                            booleanQuery.add(query, occur);
929                                    }
930                            }
931                    }
932                    else if (query instanceof BooleanQuery) {
933                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
934    
935                            BooleanQuery containerBooleanQuery = new BooleanQuery();
936    
937                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
938                                    _includeIfUnique(
939                                            containerBooleanQuery, like, queryParser,
940                                            booleanClause.getQuery(), booleanClause.getOccur());
941                            }
942    
943                            if (containerBooleanQuery.getClauses().length > 0) {
944                                    booleanQuery.add(containerBooleanQuery, occur);
945                            }
946                    }
947                    else {
948                            boolean included = false;
949    
950                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
951                                    if (query.equals(booleanClause.getQuery())) {
952                                            included = true;
953                                    }
954                            }
955    
956                            if (!included) {
957                                    booleanQuery.add(query, occur);
958                            }
959                    }
960            }
961    
962            private void _loadIndexFromCluster(
963                    IndexAccessor indexAccessor, long localLastGeneration) {
964    
965                    List<Address> clusterNodeAddresses =
966                            ClusterExecutorUtil.getClusterNodeAddresses();
967    
968                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
969    
970                    if (clusterNodeAddressesCount <= 1) {
971                            if (_log.isDebugEnabled()) {
972                                    _log.debug(
973                                            "Do not load indexes because there is either one portal " +
974                                                    "instance or no portal instances in the cluster");
975                            }
976    
977                            return;
978                    }
979    
980                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
981                            new MethodHandler(
982                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
983                            true);
984    
985                    ClusterExecutorUtil.execute(
986                            clusterRequest,
987                            new LoadIndexClusterResponseCallback(
988                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
989            }
990    
991            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
992                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
993    
994            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
995                    GetterUtil.getInteger(
996                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
997                            BooleanQuery.getMaxClauseCount());
998    
999            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1000    
1001            private static MethodKey _createTokenMethodKey = new MethodKey(
1002                    TransientTokenUtil.class, "createToken", long.class);
1003            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1004                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1005    
1006            private Analyzer _analyzer;
1007            private Map<Long, IndexAccessor> _indexAccessors =
1008                    new ConcurrentHashMap<Long, IndexAccessor>();
1009            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1010            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1011            private String _protocol;
1012            private Version _version;
1013    
1014            private static class ShutdownSyncJob implements Runnable {
1015    
1016                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1017                            _countDownLatch = countDownLatch;
1018                    }
1019    
1020                    @Override
1021                    public void run() {
1022                            _countDownLatch.countDown();
1023    
1024                            try {
1025                                    synchronized (this) {
1026                                            wait();
1027                                    }
1028                            }
1029                            catch (InterruptedException ie) {
1030                            }
1031                    }
1032    
1033                    private final CountDownLatch _countDownLatch;
1034    
1035            }
1036    
1037            private class LoadIndexClusterEventListener
1038                    implements ClusterEventListener {
1039    
1040                    @Override
1041                    public void processClusterEvent(ClusterEvent clusterEvent) {
1042                            ClusterEventType clusterEventType =
1043                                    clusterEvent.getClusterEventType();
1044    
1045                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1046                                    return;
1047                            }
1048    
1049                            List<Address> clusterNodeAddresses =
1050                                    ClusterExecutorUtil.getClusterNodeAddresses();
1051                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1052    
1053                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1054                                    if (_log.isDebugEnabled()) {
1055                                            _log.debug(
1056                                                    "Number of original cluster members is greater than " +
1057                                                            "one");
1058                                    }
1059    
1060                                    return;
1061                            }
1062    
1063                            long[] companyIds = PortalInstances.getCompanyIds();
1064    
1065                            for (long companyId : companyIds) {
1066                                    loadIndexes(companyId);
1067                            }
1068    
1069                            loadIndexes(CompanyConstants.SYSTEM);
1070                    }
1071    
1072                    private void loadIndexes(long companyId) {
1073                            long lastGeneration = getLastGeneration(companyId);
1074    
1075                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1076                                    return;
1077                            }
1078    
1079                            try {
1080                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
1081                            }
1082                            catch (Exception e) {
1083                                    _log.error(
1084                                            "Unable to load indexes for company " + companyId, e);
1085                            }
1086                    }
1087    
1088            }
1089    
1090            private class LoadIndexClusterResponseCallback
1091                    extends BaseClusterResponseCallback {
1092    
1093                    public LoadIndexClusterResponseCallback(
1094                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
1095                            long localLastGeneration) {
1096    
1097                            _indexAccessor = indexAccessor;
1098                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
1099                            _localLastGeneration = localLastGeneration;
1100    
1101                            _companyId = _indexAccessor.getCompanyId();
1102                    }
1103    
1104                    @Override
1105                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1106                            Address bootupAddress = null;
1107    
1108                            do {
1109                                    _clusterNodeAddressesCount--;
1110    
1111                                    ClusterNodeResponse clusterNodeResponse = null;
1112    
1113                                    try {
1114                                            clusterNodeResponse = blockingQueue.poll(
1115                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1116                                                    TimeUnit.MILLISECONDS);
1117                                    }
1118                                    catch (Exception e) {
1119                                            _log.error("Unable to get cluster node response", e);
1120                                    }
1121    
1122                                    if (clusterNodeResponse == null) {
1123                                            if (_log.isDebugEnabled()) {
1124                                                    _log.debug(
1125                                                            "Unable to get cluster node response in " +
1126                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1127                                                                            TimeUnit.MILLISECONDS);
1128                                            }
1129    
1130                                            continue;
1131                                    }
1132    
1133                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1134    
1135                                    if (clusterNode.getPortalInetSocketAddress() != null) {
1136                                            try {
1137                                                    long remoteLastGeneration =
1138                                                            (Long)clusterNodeResponse.getResult();
1139    
1140                                                    if (remoteLastGeneration > _localLastGeneration) {
1141                                                            bootupAddress = clusterNodeResponse.getAddress();
1142    
1143                                                            break;
1144                                                    }
1145                                            }
1146                                            catch (Exception e) {
1147                                                    if (_log.isDebugEnabled()) {
1148                                                            _log.debug(
1149                                                                    "Suppress exception caused by remote method " +
1150                                                                            "invocation",
1151                                                                    e);
1152                                                    }
1153    
1154                                                    continue;
1155                                            }
1156                                    }
1157                                    else if (_log.isDebugEnabled()) {
1158                                            _log.debug(
1159                                                    "Cluster node " + clusterNode +
1160                                                            " has invalid InetSocketAddress");
1161                                    }
1162                            }
1163                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1164    
1165                            if (bootupAddress == null) {
1166                                    return;
1167                            }
1168    
1169                            if (_log.isInfoEnabled()) {
1170                                    _log.info(
1171                                            "Start loading lucene index files from cluster node " +
1172                                                    bootupAddress);
1173                            }
1174    
1175                            InputStream inputStream = null;
1176    
1177                            try {
1178                                    inputStream = getLoadIndexesInputStreamFromCluster(
1179                                            _companyId, bootupAddress);
1180    
1181                                    _indexAccessor.loadIndex(inputStream);
1182    
1183                                    if (_log.isInfoEnabled()) {
1184                                            _log.info("Lucene index files loaded successfully");
1185                                    }
1186                            }
1187                            catch (Exception e) {
1188                                    _log.error("Unable to load index for company " + _companyId, e);
1189                            }
1190                            finally {
1191                                    if (inputStream != null) {
1192                                            try {
1193                                                    inputStream.close();
1194                                            }
1195                                            catch (IOException ioe) {
1196                                                    _log.error(
1197                                                            "Unable to close input stream for company " +
1198                                                                    _companyId,
1199                                                            ioe);
1200                                            }
1201                                    }
1202                            }
1203                    }
1204    
1205                    @Override
1206                    public void processTimeoutException(TimeoutException timeoutException) {
1207                            _log.error(
1208                                    "Unable to load index for company " + _companyId,
1209                                    timeoutException);
1210                    }
1211    
1212                    private int _clusterNodeAddressesCount;
1213                    private long _companyId;
1214                    private IndexAccessor _indexAccessor;
1215                    private long _localLastGeneration;
1216    
1217            }
1218    
1219    }