001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLink;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.Destination;
036    import com.liferay.portal.kernel.messaging.MessageBus;
037    import com.liferay.portal.kernel.messaging.MessageBusUtil;
038    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
039    import com.liferay.portal.kernel.search.BooleanClauseOccur;
040    import com.liferay.portal.kernel.search.Field;
041    import com.liferay.portal.kernel.search.SearchEngineUtil;
042    import com.liferay.portal.kernel.util.ArrayUtil;
043    import com.liferay.portal.kernel.util.GetterUtil;
044    import com.liferay.portal.kernel.util.MethodHandler;
045    import com.liferay.portal.kernel.util.MethodKey;
046    import com.liferay.portal.kernel.util.ObjectValuePair;
047    import com.liferay.portal.kernel.util.PropsKeys;
048    import com.liferay.portal.kernel.util.StringPool;
049    import com.liferay.portal.kernel.util.StringUtil;
050    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
051    import com.liferay.portal.kernel.util.Validator;
052    import com.liferay.portal.model.CompanyConstants;
053    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
054    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
055    import com.liferay.portal.security.auth.TransientTokenUtil;
056    import com.liferay.portal.util.PortalInstances;
057    import com.liferay.portal.util.PortalUtil;
058    import com.liferay.portal.util.PropsUtil;
059    import com.liferay.portal.util.PropsValues;
060    
061    import java.io.IOException;
062    import java.io.InputStream;
063    import java.io.OutputStream;
064    
065    import java.net.InetAddress;
066    import java.net.URL;
067    import java.net.URLConnection;
068    
069    import java.util.HashSet;
070    import java.util.List;
071    import java.util.Map;
072    import java.util.Set;
073    import java.util.concurrent.BlockingQueue;
074    import java.util.concurrent.ConcurrentHashMap;
075    import java.util.concurrent.TimeUnit;
076    import java.util.concurrent.TimeoutException;
077    
078    import org.apache.commons.lang.time.StopWatch;
079    import org.apache.lucene.analysis.Analyzer;
080    import org.apache.lucene.analysis.TokenStream;
081    import org.apache.lucene.document.Document;
082    import org.apache.lucene.index.IndexReader;
083    import org.apache.lucene.index.Term;
084    import org.apache.lucene.queryParser.QueryParser;
085    import org.apache.lucene.search.BooleanClause;
086    import org.apache.lucene.search.BooleanQuery;
087    import org.apache.lucene.search.IndexSearcher;
088    import org.apache.lucene.search.NumericRangeQuery;
089    import org.apache.lucene.search.Query;
090    import org.apache.lucene.search.TermQuery;
091    import org.apache.lucene.search.TermRangeQuery;
092    import org.apache.lucene.search.WildcardQuery;
093    import org.apache.lucene.search.highlight.Formatter;
094    import org.apache.lucene.search.highlight.Highlighter;
095    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
096    import org.apache.lucene.search.highlight.QueryScorer;
097    import org.apache.lucene.search.highlight.SimpleFragmenter;
098    import org.apache.lucene.search.highlight.WeightedTerm;
099    import org.apache.lucene.util.Version;
100    
101    /**
102     * @author Brian Wing Shun Chan
103     * @author Harry Mark
104     * @author Bruno Farache
105     * @author Shuyang Zhou
106     * @author Tina Tian
107     * @author Hugo Huijser
108     * @author Andrea Di Giorgi
109     */
110    public class LuceneHelperImpl implements LuceneHelper {
111    
112            @Override
113            public void addDocument(long companyId, Document document)
114                    throws IOException {
115    
116                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
117    
118                    indexAccessor.addDocument(document);
119            }
120    
121            @Override
122            public void addExactTerm(
123                    BooleanQuery booleanQuery, String field, String value) {
124    
125                    addTerm(booleanQuery, field, value, false);
126            }
127    
128            @Override
129            public void addNumericRangeTerm(
130                    BooleanQuery booleanQuery, String field, Integer startValue,
131                    Integer endValue) {
132    
133                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
134                            field, startValue, endValue, true, true);
135    
136                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
137            }
138    
139            @Override
140            public void addNumericRangeTerm(
141                    BooleanQuery booleanQuery, String field, Long startValue,
142                    Long endValue) {
143    
144                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
145                            field, startValue, endValue, true, true);
146    
147                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
148            }
149    
150            /**
151             * @deprecated As of 6.2.0, replaced by {@link
152             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
153             */
154            @Override
155            public void addNumericRangeTerm(
156                    BooleanQuery booleanQuery, String field, String startValue,
157                    String endValue) {
158    
159                    addNumericRangeTerm(
160                            booleanQuery, field, GetterUtil.getLong(startValue),
161                            GetterUtil.getLong(endValue));
162            }
163    
164            @Override
165            public void addRangeTerm(
166                    BooleanQuery booleanQuery, String field, String startValue,
167                    String endValue) {
168    
169                    boolean includesLower = true;
170    
171                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
172                            includesLower = false;
173                    }
174    
175                    boolean includesUpper = true;
176    
177                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
178                            includesUpper = false;
179                    }
180    
181                    TermRangeQuery termRangeQuery = new TermRangeQuery(
182                            field, startValue, endValue, includesLower, includesUpper);
183    
184                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
185            }
186    
187            @Override
188            public void addRequiredTerm(
189                    BooleanQuery booleanQuery, String field, String value, boolean like) {
190    
191                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
192            }
193    
194            @Override
195            public void addRequiredTerm(
196                    BooleanQuery booleanQuery, String field, String[] values,
197                    boolean like) {
198    
199                    if (values == null) {
200                            return;
201                    }
202    
203                    BooleanQuery query = new BooleanQuery();
204    
205                    for (String value : values) {
206                            addTerm(query, field, value, like);
207                    }
208    
209                    booleanQuery.add(query, BooleanClause.Occur.MUST);
210            }
211    
212            @Override
213            public void addTerm(
214                    BooleanQuery booleanQuery, String field, String value, boolean like) {
215    
216                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
217            }
218    
219            @Override
220            public void addTerm(
221                    BooleanQuery booleanQuery, String field, String value, boolean like,
222                    BooleanClauseOccur booleanClauseOccur) {
223    
224                    if (Validator.isNull(value)) {
225                            return;
226                    }
227    
228                    Analyzer analyzer = getAnalyzer();
229    
230                    if (analyzer instanceof PerFieldAnalyzer) {
231                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
232    
233                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
234    
235                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
236                                    like = true;
237                            }
238                    }
239    
240                    if (like) {
241                            value = StringUtil.replace(
242                                    value, StringPool.PERCENT, StringPool.BLANK);
243                    }
244    
245                    try {
246                            QueryParser queryParser = new QueryParser(
247                                    getVersion(), field, analyzer);
248    
249                            Query query = queryParser.parse(value);
250    
251                            BooleanClause.Occur occur = null;
252    
253                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
254                                    occur = BooleanClause.Occur.MUST;
255                            }
256                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
257                                    occur = BooleanClause.Occur.MUST_NOT;
258                            }
259                            else {
260                                    occur = BooleanClause.Occur.SHOULD;
261                            }
262    
263                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
264                    }
265                    catch (Exception e) {
266                            if (_log.isWarnEnabled()) {
267                                    _log.warn(e, e);
268                            }
269                    }
270            }
271    
272            @Override
273            public void addTerm(
274                    BooleanQuery booleanQuery, String field, String[] values,
275                    boolean like) {
276    
277                    for (String value : values) {
278                            addTerm(booleanQuery, field, value, like);
279                    }
280            }
281    
282            @Override
283            public void cleanUp(IndexSearcher indexSearcher) {
284                    if (indexSearcher == null) {
285                            return;
286                    }
287    
288                    try {
289                            indexSearcher.close();
290    
291                            IndexReader indexReader = indexSearcher.getIndexReader();
292    
293                            if (indexReader != null) {
294                                    indexReader.close();
295                            }
296                    }
297                    catch (IOException ioe) {
298                            _log.error(ioe, ioe);
299                    }
300            }
301    
302            @Override
303            public int countScoredFieldNames(Query query, String[] filedNames) {
304                    int count = 0;
305    
306                    for (String fieldName : filedNames) {
307                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
308                                    query, false, fieldName);
309    
310                            if ((weightedTerms.length > 0) &&
311                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
312    
313                                    count++;
314                            }
315                    }
316    
317                    return count;
318            }
319    
320            @Override
321            public void delete(long companyId) {
322                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
323    
324                    if (indexAccessor == null) {
325                            return;
326                    }
327    
328                    indexAccessor.delete();
329            }
330    
331            @Override
332            public void deleteDocuments(long companyId, Term term) throws IOException {
333                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
334    
335                    if (indexAccessor == null) {
336                            return;
337                    }
338    
339                    indexAccessor.deleteDocuments(term);
340            }
341    
342            @Override
343            public void dumpIndex(long companyId, OutputStream outputStream)
344                    throws IOException {
345    
346                    long lastGeneration = getLastGeneration(companyId);
347    
348                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
349                            if (_log.isDebugEnabled()) {
350                                    _log.debug(
351                                            "Dump index from cluster is not enabled for " + companyId);
352                            }
353    
354                            return;
355                    }
356    
357                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
358    
359                    if (indexAccessor == null) {
360                            return;
361                    }
362    
363                    indexAccessor.dumpIndex(outputStream);
364            }
365    
366            @Override
367            public Analyzer getAnalyzer() {
368                    return _analyzer;
369            }
370    
371            @Override
372            public IndexAccessor getIndexAccessor(long companyId) {
373                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
374    
375                    if (indexAccessor != null) {
376                            return indexAccessor;
377                    }
378    
379                    synchronized (this) {
380                            indexAccessor = _indexAccessors.get(companyId);
381    
382                            if (indexAccessor == null) {
383                                    indexAccessor = new IndexAccessorImpl(companyId);
384    
385                                    if (isLoadIndexFromClusterEnabled()) {
386                                            indexAccessor = new SynchronizedIndexAccessorImpl(
387                                                    indexAccessor);
388    
389                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
390                                                    MessageValuesThreadLocal.getValue(
391                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
392    
393                                            if (clusterForwardMessage) {
394                                                    if (_log.isInfoEnabled()) {
395                                                            _log.info(
396                                                                    "Skip Luncene index files cluster loading " +
397                                                                            "since this is a manual reindex request");
398                                                    }
399                                            }
400                                            else {
401                                                    try {
402                                                            _loadIndexFromCluster(
403                                                                    indexAccessor,
404                                                                    indexAccessor.getLastGeneration());
405                                                    }
406                                                    catch (Exception e) {
407                                                            _log.error(
408                                                                    "Unable to load index for company " +
409                                                                            indexAccessor.getCompanyId(),
410                                                                    e);
411                                                    }
412                                            }
413                                    }
414    
415                                    _indexAccessors.put(companyId, indexAccessor);
416                            }
417                    }
418    
419                    return indexAccessor;
420            }
421    
422            @Override
423            public long getLastGeneration(long companyId) {
424                    if (!isLoadIndexFromClusterEnabled()) {
425                            return IndexAccessor.DEFAULT_LAST_GENERATION;
426                    }
427    
428                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
429    
430                    if (indexAccessor == null) {
431                            return IndexAccessor.DEFAULT_LAST_GENERATION;
432                    }
433    
434                    return indexAccessor.getLastGeneration();
435            }
436    
437            @Override
438            public InputStream getLoadIndexesInputStreamFromCluster(
439                            long companyId, Address bootupAddress)
440                    throws SystemException {
441    
442                    if (!isLoadIndexFromClusterEnabled()) {
443                            return null;
444                    }
445    
446                    InputStream inputStream = null;
447    
448                    try {
449                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
450                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
451    
452                            URL url = bootupClusterNodeObjectValuePair.getValue();
453    
454                            URLConnection urlConnection = url.openConnection();
455    
456                            urlConnection.setDoOutput(true);
457    
458                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
459                                    urlConnection.getOutputStream());
460    
461                            unsyncPrintWriter.write("transientToken=");
462                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
463                            unsyncPrintWriter.write("&companyId=");
464                            unsyncPrintWriter.write(String.valueOf(companyId));
465    
466                            unsyncPrintWriter.close();
467    
468                            inputStream = urlConnection.getInputStream();
469    
470                            return inputStream;
471                    }
472                    catch (IOException ioe) {
473                            throw new SystemException(ioe);
474                    }
475            }
476    
477            @Override
478            public Set<String> getQueryTerms(Query query) {
479                    String queryString = StringUtil.replace(
480                            query.toString(), StringPool.STAR, StringPool.BLANK);
481    
482                    Query tempQuery = null;
483    
484                    try {
485                            QueryParser queryParser = new QueryParser(
486                                    getVersion(), StringPool.BLANK, getAnalyzer());
487    
488                            tempQuery = queryParser.parse(queryString);
489                    }
490                    catch (Exception e) {
491                            if (_log.isWarnEnabled()) {
492                                    _log.warn("Unable to parse " + queryString);
493                            }
494    
495                            tempQuery = query;
496                    }
497    
498                    WeightedTerm[] weightedTerms = null;
499    
500                    for (String fieldName : Field.KEYWORDS) {
501                            weightedTerms = QueryTermExtractor.getTerms(
502                                    tempQuery, false, fieldName);
503    
504                            if (weightedTerms.length > 0) {
505                                    break;
506                            }
507                    }
508    
509                    Set<String> queryTerms = new HashSet<String>();
510    
511                    for (WeightedTerm weightedTerm : weightedTerms) {
512                            queryTerms.add(weightedTerm.getTerm());
513                    }
514    
515                    return queryTerms;
516            }
517    
518            @Override
519            public IndexSearcher getSearcher(long companyId, boolean readOnly)
520                    throws IOException {
521    
522                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
523    
524                    IndexReader indexReader = IndexReader.open(
525                            indexAccessor.getLuceneDir(), readOnly);
526    
527                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
528    
529                    indexSearcher.setDefaultFieldSortScoring(true, true);
530                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
531    
532                    return indexSearcher;
533            }
534    
535            @Override
536            public String getSnippet(
537                            Query query, String field, String s, int maxNumFragments,
538                            int fragmentLength, String fragmentSuffix, Formatter formatter)
539                    throws IOException {
540    
541                    QueryScorer queryScorer = new QueryScorer(query, field);
542    
543                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
544    
545                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
546    
547                    TokenStream tokenStream = getAnalyzer().tokenStream(
548                            field, new UnsyncStringReader(s));
549    
550                    try {
551                            String snippet = highlighter.getBestFragments(
552                                    tokenStream, s, maxNumFragments, fragmentSuffix);
553    
554                            if (Validator.isNotNull(snippet) &&
555                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
556                                    !s.equals(snippet)) {
557    
558                                    snippet = snippet.concat(fragmentSuffix);
559                            }
560    
561                            return snippet;
562                    }
563                    catch (InvalidTokenOffsetsException itoe) {
564                            throw new IOException(itoe.getMessage());
565                    }
566            }
567    
568            @Override
569            public Version getVersion() {
570                    return _version;
571            }
572    
573            @Override
574            public boolean isLoadIndexFromClusterEnabled() {
575                    if (PropsValues.CLUSTER_LINK_ENABLED &&
576                            PropsValues.LUCENE_REPLICATE_WRITE) {
577    
578                            return true;
579                    }
580    
581                    if (_log.isDebugEnabled()) {
582                            _log.debug("Load index from cluster is not enabled");
583                    }
584    
585                    return false;
586            }
587    
588            @Override
589            public void loadIndex(long companyId, InputStream inputStream)
590                    throws IOException {
591    
592                    if (!isLoadIndexFromClusterEnabled()) {
593                            return;
594                    }
595    
596                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
597    
598                    if (indexAccessor == null) {
599                            if (_log.isInfoEnabled()) {
600                                    _log.info(
601                                            "Skip loading Lucene index files for company " + companyId +
602                                                    " in favor of lazy loading");
603                            }
604    
605                            return;
606                    }
607    
608                    StopWatch stopWatch = null;
609    
610                    if (_log.isInfoEnabled()) {
611                            _log.info(
612                                    "Start loading Lucene index files for company " + companyId);
613    
614                            stopWatch = new StopWatch();
615    
616                            stopWatch.start();
617                    }
618    
619                    indexAccessor.loadIndex(inputStream);
620    
621                    if (_log.isInfoEnabled()) {
622                            _log.info(
623                                    "Finished loading index files for company " + companyId +
624                                            " in " + stopWatch.getTime() + " ms");
625                    }
626            }
627    
628            @Override
629            public void loadIndexesFromCluster(long companyId) throws SystemException {
630                    if (!isLoadIndexFromClusterEnabled()) {
631                            return;
632                    }
633    
634                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
635    
636                    if (indexAccessor == null) {
637                            return;
638                    }
639    
640                    long localLastGeneration = getLastGeneration(companyId);
641    
642                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
643            }
644    
645            public void setAnalyzer(Analyzer analyzer) {
646                    _analyzer = analyzer;
647            }
648    
649            public void setVersion(Version version) {
650                    _version = version;
651            }
652    
653            @Override
654            public void shutdown() {
655                    if (_luceneIndexThreadPoolExecutor != null) {
656                            _luceneIndexThreadPoolExecutor.shutdownNow();
657    
658                            try {
659                                    _luceneIndexThreadPoolExecutor.awaitTermination(
660                                            60, TimeUnit.SECONDS);
661                            }
662                            catch (InterruptedException ie) {
663                                    _log.error("Lucene indexer shutdown interrupted", ie);
664                            }
665                    }
666    
667                    if (isLoadIndexFromClusterEnabled()) {
668                            ClusterExecutorUtil.removeClusterEventListener(
669                                    _loadIndexClusterEventListener);
670                    }
671    
672                    MessageBus messageBus = MessageBusUtil.getMessageBus();
673    
674                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
675                            String searchReaderDestinationName =
676                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
677    
678                            Destination searchReaderDestination = messageBus.getDestination(
679                                    searchReaderDestinationName);
680    
681                            if (searchReaderDestination != null) {
682                                    searchReaderDestination.close(true);
683                            }
684                    }
685    
686                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
687                            indexAccessor.close();
688                    }
689            }
690    
691            @Override
692            public void shutdown(long companyId) {
693                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
694    
695                    _indexAccessors.remove(indexAccessor);
696    
697                    indexAccessor.close();
698            }
699    
700            @Override
701            public void startup(long companyId) {
702                    if (!PropsValues.INDEX_ON_STARTUP) {
703                            return;
704                    }
705    
706                    if (_log.isInfoEnabled()) {
707                            _log.info("Indexing Lucene on startup");
708                    }
709    
710                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
711    
712                    if (PropsValues.INDEX_WITH_THREAD) {
713                            if (_luceneIndexThreadPoolExecutor == null) {
714    
715                                    // This should never be null except for the case where
716                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
717                                    // PropsValues#INDEX_ON_STARTUP to true.
718    
719                                    _luceneIndexThreadPoolExecutor =
720                                            PortalExecutorManagerUtil.getPortalExecutor(
721                                                    LuceneHelperImpl.class.getName());
722                            }
723    
724                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
725                    }
726                    else {
727                            luceneIndexer.reindex();
728                    }
729            }
730    
731            @Override
732            public void updateDocument(long companyId, Term term, Document document)
733                    throws IOException {
734    
735                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
736    
737                    indexAccessor.updateDocument(term, document);
738            }
739    
740            private LuceneHelperImpl() {
741                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
742                            _luceneIndexThreadPoolExecutor =
743                                    PortalExecutorManagerUtil.getPortalExecutor(
744                                            LuceneHelperImpl.class.getName());
745                    }
746    
747                    if (isLoadIndexFromClusterEnabled()) {
748                            _loadIndexClusterEventListener =
749                                    new LoadIndexClusterEventListener();
750    
751                            ClusterExecutorUtil.addClusterEventListener(
752                                    _loadIndexClusterEventListener);
753                    }
754    
755                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
756            }
757    
758            private ObjectValuePair<String, URL>
759                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
760                    throws SystemException {
761    
762                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
763                            new MethodHandler(
764                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
765                            bootupAddress);
766    
767                    FutureClusterResponses futureClusterResponses =
768                            ClusterExecutorUtil.execute(clusterRequest);
769    
770                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
771                            futureClusterResponses.getPartialResults();
772    
773                    try {
774                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
775                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
776                                    TimeUnit.MILLISECONDS);
777    
778                            String transientToken = (String)clusterNodeResponse.getResult();
779    
780                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
781    
782                            InetAddress inetAddress = clusterNode.getInetAddress();
783    
784                            String fileName = PortalUtil.getPathContext();
785    
786                            if (!fileName.endsWith(StringPool.SLASH)) {
787                                    fileName = fileName.concat(StringPool.SLASH);
788                            }
789    
790                            fileName = fileName.concat("lucene/dump");
791    
792                            URL url = new URL(
793                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
794                                    fileName);
795    
796                            return new ObjectValuePair<String, URL>(transientToken, url);
797                    }
798                    catch (Exception e) {
799                            throw new SystemException(e);
800                    }
801            }
802    
803            private void _includeIfUnique(
804                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
805                    Query query, BooleanClause.Occur occur) {
806    
807                    if (query instanceof TermQuery) {
808                            Set<Term> terms = new HashSet<Term>();
809    
810                            TermQuery termQuery = (TermQuery)query;
811    
812                            termQuery.extractTerms(terms);
813    
814                            float boost = termQuery.getBoost();
815    
816                            for (Term term : terms) {
817                                    String termValue = term.text();
818    
819                                    if (like) {
820                                            termValue = termValue.toLowerCase(queryParser.getLocale());
821    
822                                            term = term.createTerm(
823                                                    StringPool.STAR.concat(termValue).concat(
824                                                            StringPool.STAR));
825    
826                                            query = new WildcardQuery(term);
827                                    }
828                                    else {
829                                            query = new TermQuery(term);
830                                    }
831    
832                                    query.setBoost(boost);
833    
834                                    boolean included = false;
835    
836                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
837                                            if (query.equals(booleanClause.getQuery())) {
838                                                    included = true;
839                                            }
840                                    }
841    
842                                    if (!included) {
843                                            booleanQuery.add(query, occur);
844                                    }
845                            }
846                    }
847                    else if (query instanceof BooleanQuery) {
848                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
849    
850                            BooleanQuery containerBooleanQuery = new BooleanQuery();
851    
852                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
853                                    _includeIfUnique(
854                                            containerBooleanQuery, like, queryParser,
855                                            booleanClause.getQuery(), booleanClause.getOccur());
856                            }
857    
858                            if (containerBooleanQuery.getClauses().length > 0) {
859                                    booleanQuery.add(containerBooleanQuery, occur);
860                            }
861                    }
862                    else {
863                            boolean included = false;
864    
865                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
866                                    if (query.equals(booleanClause.getQuery())) {
867                                            included = true;
868                                    }
869                            }
870    
871                            if (!included) {
872                                    booleanQuery.add(query, occur);
873                            }
874                    }
875            }
876    
877            private void _loadIndexFromCluster(
878                            IndexAccessor indexAccessor, long localLastGeneration)
879                    throws SystemException {
880    
881                    List<Address> clusterNodeAddresses =
882                            ClusterExecutorUtil.getClusterNodeAddresses();
883    
884                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
885    
886                    if (clusterNodeAddressesCount <= 1) {
887                            if (_log.isDebugEnabled()) {
888                                    _log.debug(
889                                            "Do not load indexes because there is either one portal " +
890                                                    "instance or no portal instances in the cluster");
891                            }
892    
893                            return;
894                    }
895    
896                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
897                            new MethodHandler(
898                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
899                            true);
900    
901                    ClusterExecutorUtil.execute(
902                            clusterRequest,
903                            new LoadIndexClusterResponseCallback(
904                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
905            }
906    
907            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
908                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
909    
910            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
911                    GetterUtil.getInteger(
912                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
913                            BooleanQuery.getMaxClauseCount());
914    
915            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
916    
917            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
918    
919            private static MethodKey _createTokenMethodKey = new MethodKey(
920                    TransientTokenUtil.class, "createToken", long.class);
921            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
922                    LuceneHelperUtil.class, "getLastGeneration", long.class);
923    
924            private Analyzer _analyzer;
925            private Map<Long, IndexAccessor> _indexAccessors =
926                    new ConcurrentHashMap<Long, IndexAccessor>();
927            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
928            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
929            private Version _version;
930    
931            private class LoadIndexClusterEventListener
932                    implements ClusterEventListener {
933    
934                    @Override
935                    public void processClusterEvent(ClusterEvent clusterEvent) {
936                            ClusterEventType clusterEventType =
937                                    clusterEvent.getClusterEventType();
938    
939                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
940                                    return;
941                            }
942    
943                            List<Address> clusterNodeAddresses =
944                                    ClusterExecutorUtil.getClusterNodeAddresses();
945                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
946    
947                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
948                                    if (_log.isDebugEnabled()) {
949                                            _log.debug(
950                                                    "Number of original cluster members is greater than " +
951                                                            "one");
952                                    }
953    
954                                    return;
955                            }
956    
957                            long[] companyIds = PortalInstances.getCompanyIds();
958    
959                            for (long companyId : companyIds) {
960                                    loadIndexes(companyId);
961                            }
962    
963                            loadIndexes(CompanyConstants.SYSTEM);
964                    }
965    
966                    private void loadIndexes(long companyId) {
967                            long lastGeneration = getLastGeneration(companyId);
968    
969                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
970                                    return;
971                            }
972    
973                            try {
974                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
975                            }
976                            catch (Exception e) {
977                                    _log.error(
978                                            "Unable to load indexes for company " + companyId, e);
979                            }
980                    }
981    
982            }
983    
984            private class LoadIndexClusterResponseCallback
985                    extends BaseClusterResponseCallback {
986    
987                    public LoadIndexClusterResponseCallback(
988                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
989                            long localLastGeneration) {
990    
991                            _indexAccessor = indexAccessor;
992                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
993                            _localLastGeneration = localLastGeneration;
994    
995                            _companyId = _indexAccessor.getCompanyId();
996                    }
997    
998                    @Override
999                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1000                            Address bootupAddress = null;
1001    
1002                            do {
1003                                    _clusterNodeAddressesCount--;
1004    
1005                                    ClusterNodeResponse clusterNodeResponse = null;
1006    
1007                                    try {
1008                                            clusterNodeResponse = blockingQueue.poll(
1009                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1010                                                    TimeUnit.MILLISECONDS);
1011                                    }
1012                                    catch (Exception e) {
1013                                            _log.error("Unable to get cluster node response", e);
1014                                    }
1015    
1016                                    if (clusterNodeResponse == null) {
1017                                            if (_log.isDebugEnabled()) {
1018                                                    _log.debug(
1019                                                            "Unable to get cluster node response in " +
1020                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1021                                                                            TimeUnit.MILLISECONDS);
1022                                            }
1023    
1024                                            continue;
1025                                    }
1026    
1027                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1028    
1029                                    if (clusterNode.getPort() > 0) {
1030                                            try {
1031                                                    long remoteLastGeneration =
1032                                                            (Long)clusterNodeResponse.getResult();
1033    
1034                                                    if (remoteLastGeneration > _localLastGeneration) {
1035                                                            bootupAddress = clusterNodeResponse.getAddress();
1036    
1037                                                            break;
1038                                                    }
1039                                            }
1040                                            catch (Exception e) {
1041                                                    if (_log.isDebugEnabled()) {
1042                                                            _log.debug(
1043                                                                    "Suppress exception caused by remote method " +
1044                                                                            "invocation",
1045                                                                    e);
1046                                                    }
1047    
1048                                                    continue;
1049                                            }
1050                                    }
1051                                    else {
1052                                            if (_log.isDebugEnabled()) {
1053                                                    _log.debug(
1054                                                            "Cluster node " + clusterNode +
1055                                                                    " has invalid port");
1056                                            }
1057                                    }
1058                            }
1059                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1060    
1061                            if (bootupAddress == null) {
1062                                    return;
1063                            }
1064    
1065                            if (_log.isInfoEnabled()) {
1066                                    _log.info(
1067                                            "Start loading lucene index files from cluster node " +
1068                                                    bootupAddress);
1069                            }
1070    
1071                            InputStream inputStream = null;
1072    
1073                            try {
1074                                    inputStream = getLoadIndexesInputStreamFromCluster(
1075                                            _companyId, bootupAddress);
1076    
1077                                    _indexAccessor.loadIndex(inputStream);
1078    
1079                                    if (_log.isInfoEnabled()) {
1080                                            _log.info("Lucene index files loaded successfully");
1081                                    }
1082                            }
1083                            catch (Exception e) {
1084                                    _log.error("Unable to load index for company " + _companyId, e);
1085                            }
1086                            finally {
1087                                    if (inputStream != null) {
1088                                            try {
1089                                                    inputStream.close();
1090                                            }
1091                                            catch (IOException ioe) {
1092                                                    _log.error(
1093                                                            "Unable to close input stream for company " +
1094                                                                    _companyId,
1095                                                            ioe);
1096                                            }
1097                                    }
1098                            }
1099                    }
1100    
1101                    @Override
1102                    public void processTimeoutException(TimeoutException timeoutException) {
1103                            _log.error(
1104                                    "Unable to load index for company " + _companyId,
1105                                    timeoutException);
1106                    }
1107    
1108                    private int _clusterNodeAddressesCount;
1109                    private long _companyId;
1110                    private IndexAccessor _indexAccessor;
1111                    private long _localLastGeneration;
1112    
1113            }
1114    
1115    }