001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLink;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036    import com.liferay.portal.kernel.search.BooleanClauseOccur;
037    import com.liferay.portal.kernel.search.Field;
038    import com.liferay.portal.kernel.util.ArrayUtil;
039    import com.liferay.portal.kernel.util.GetterUtil;
040    import com.liferay.portal.kernel.util.MethodHandler;
041    import com.liferay.portal.kernel.util.MethodKey;
042    import com.liferay.portal.kernel.util.ObjectValuePair;
043    import com.liferay.portal.kernel.util.PropsKeys;
044    import com.liferay.portal.kernel.util.StringPool;
045    import com.liferay.portal.kernel.util.StringUtil;
046    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
047    import com.liferay.portal.kernel.util.Validator;
048    import com.liferay.portal.model.CompanyConstants;
049    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
050    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
051    import com.liferay.portal.security.auth.TransientTokenUtil;
052    import com.liferay.portal.util.PortalInstances;
053    import com.liferay.portal.util.PortalUtil;
054    import com.liferay.portal.util.PropsUtil;
055    import com.liferay.portal.util.PropsValues;
056    import com.liferay.util.lucene.KeywordsUtil;
057    
058    import java.io.IOException;
059    import java.io.InputStream;
060    import java.io.OutputStream;
061    
062    import java.net.InetAddress;
063    import java.net.URL;
064    import java.net.URLConnection;
065    
066    import java.util.HashSet;
067    import java.util.List;
068    import java.util.Map;
069    import java.util.Set;
070    import java.util.concurrent.BlockingQueue;
071    import java.util.concurrent.ConcurrentHashMap;
072    import java.util.concurrent.TimeUnit;
073    import java.util.concurrent.TimeoutException;
074    
075    import org.apache.commons.lang.time.StopWatch;
076    import org.apache.lucene.analysis.Analyzer;
077    import org.apache.lucene.analysis.TokenStream;
078    import org.apache.lucene.document.Document;
079    import org.apache.lucene.index.IndexReader;
080    import org.apache.lucene.index.Term;
081    import org.apache.lucene.queryParser.QueryParser;
082    import org.apache.lucene.search.BooleanClause;
083    import org.apache.lucene.search.BooleanQuery;
084    import org.apache.lucene.search.IndexSearcher;
085    import org.apache.lucene.search.NumericRangeQuery;
086    import org.apache.lucene.search.Query;
087    import org.apache.lucene.search.TermQuery;
088    import org.apache.lucene.search.TermRangeQuery;
089    import org.apache.lucene.search.WildcardQuery;
090    import org.apache.lucene.search.highlight.Formatter;
091    import org.apache.lucene.search.highlight.Highlighter;
092    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
093    import org.apache.lucene.search.highlight.QueryScorer;
094    import org.apache.lucene.search.highlight.SimpleFragmenter;
095    import org.apache.lucene.search.highlight.WeightedTerm;
096    import org.apache.lucene.util.Version;
097    
098    /**
099     * @author Brian Wing Shun Chan
100     * @author Harry Mark
101     * @author Bruno Farache
102     * @author Shuyang Zhou
103     * @author Tina Tian
104     * @author Hugo Huijser
105     * @author Andrea Di Giorgi
106     */
107    public class LuceneHelperImpl implements LuceneHelper {
108    
109            @Override
110            public void addDocument(long companyId, Document document)
111                    throws IOException {
112    
113                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
114    
115                    indexAccessor.addDocument(document);
116            }
117    
118            @Override
119            public void addExactTerm(
120                    BooleanQuery booleanQuery, String field, String value) {
121    
122                    addTerm(booleanQuery, field, value, false);
123            }
124    
125            @Override
126            public void addNumericRangeTerm(
127                    BooleanQuery booleanQuery, String field, Integer startValue,
128                    Integer endValue) {
129    
130                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
131                            field, startValue, endValue, true, true);
132    
133                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
134            }
135    
136            @Override
137            public void addNumericRangeTerm(
138                    BooleanQuery booleanQuery, String field, Long startValue,
139                    Long endValue) {
140    
141                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
142                            field, startValue, endValue, true, true);
143    
144                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
145            }
146    
147            /**
148             * @deprecated As of 6.2.0, replaced by {@link
149             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
150             */
151            @Override
152            public void addNumericRangeTerm(
153                    BooleanQuery booleanQuery, String field, String startValue,
154                    String endValue) {
155    
156                    addNumericRangeTerm(
157                            booleanQuery, field, GetterUtil.getLong(startValue),
158                            GetterUtil.getLong(endValue));
159            }
160    
161            @Override
162            public void addRangeTerm(
163                    BooleanQuery booleanQuery, String field, String startValue,
164                    String endValue) {
165    
166                    boolean includesLower = true;
167    
168                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
169                            includesLower = false;
170                    }
171    
172                    boolean includesUpper = true;
173    
174                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
175                            includesUpper = false;
176                    }
177    
178                    TermRangeQuery termRangeQuery = new TermRangeQuery(
179                            field, startValue, endValue, includesLower, includesUpper);
180    
181                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
182            }
183    
184            @Override
185            public void addRequiredTerm(
186                    BooleanQuery booleanQuery, String field, String value, boolean like) {
187    
188                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
189            }
190    
191            @Override
192            public void addRequiredTerm(
193                    BooleanQuery booleanQuery, String field, String[] values,
194                    boolean like) {
195    
196                    if (values == null) {
197                            return;
198                    }
199    
200                    BooleanQuery query = new BooleanQuery();
201    
202                    for (String value : values) {
203                            addTerm(query, field, value, like);
204                    }
205    
206                    booleanQuery.add(query, BooleanClause.Occur.MUST);
207            }
208    
209            @Override
210            public void addTerm(
211                    BooleanQuery booleanQuery, String field, String value, boolean like) {
212    
213                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
214            }
215    
216            @Override
217            public void addTerm(
218                    BooleanQuery booleanQuery, String field, String value, boolean like,
219                    BooleanClauseOccur booleanClauseOccur) {
220    
221                    if (Validator.isNull(value)) {
222                            return;
223                    }
224    
225                    Analyzer analyzer = getAnalyzer();
226    
227                    if (analyzer instanceof PerFieldAnalyzer) {
228                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
229    
230                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
231    
232                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
233                                    like = true;
234                            }
235                    }
236    
237                    if (like) {
238                            value = StringUtil.replace(
239                                    value, StringPool.PERCENT, StringPool.BLANK);
240                    }
241    
242                    try {
243                            QueryParser queryParser = new QueryParser(
244                                    getVersion(), field, analyzer);
245    
246                            Query query = queryParser.parse(value);
247    
248                            try {
249                                    if (like && (query instanceof TermQuery)) {
250    
251                                            // LUCENE-89
252    
253                                            TermQuery termQuery = (TermQuery)query;
254    
255                                            Term term = termQuery.getTerm();
256    
257                                            value = term.text();
258                                            value = value.toLowerCase(queryParser.getLocale());
259    
260                                            query = new TermQuery(term.createTerm(value));
261    
262                                            query.setBoost(termQuery.getBoost());
263                                    }
264                            }
265                            catch (Exception e) {
266                                    query = queryParser.parse(KeywordsUtil.escape(value));
267                            }
268    
269                            BooleanClause.Occur occur = null;
270    
271                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
272                                    occur = BooleanClause.Occur.MUST;
273                            }
274                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
275                                    occur = BooleanClause.Occur.MUST_NOT;
276                            }
277                            else {
278                                    occur = BooleanClause.Occur.SHOULD;
279                            }
280    
281                            _includeIfUnique(booleanQuery, query, occur, like);
282                    }
283                    catch (Exception e) {
284                            if (_log.isWarnEnabled()) {
285                                    _log.warn(e, e);
286                            }
287                    }
288            }
289    
290            @Override
291            public void addTerm(
292                    BooleanQuery booleanQuery, String field, String[] values,
293                    boolean like) {
294    
295                    for (String value : values) {
296                            addTerm(booleanQuery, field, value, like);
297                    }
298            }
299    
300            @Override
301            public void cleanUp(IndexSearcher indexSearcher) {
302                    if (indexSearcher == null) {
303                            return;
304                    }
305    
306                    try {
307                            indexSearcher.close();
308    
309                            IndexReader indexReader = indexSearcher.getIndexReader();
310    
311                            if (indexReader != null) {
312                                    indexReader.close();
313                            }
314                    }
315                    catch (IOException ioe) {
316                            _log.error(ioe, ioe);
317                    }
318            }
319    
320            @Override
321            public int countScoredFieldNames(Query query, String[] filedNames) {
322                    int count = 0;
323    
324                    for (String fieldName : filedNames) {
325                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
326                                    query, false, fieldName);
327    
328                            if ((weightedTerms.length > 0) &&
329                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
330    
331                                    count++;
332                            }
333                    }
334    
335                    return count;
336            }
337    
338            @Override
339            public void delete(long companyId) {
340                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
341    
342                    if (indexAccessor == null) {
343                            return;
344                    }
345    
346                    indexAccessor.delete();
347            }
348    
349            @Override
350            public void deleteDocuments(long companyId, Term term) throws IOException {
351                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
352    
353                    if (indexAccessor == null) {
354                            return;
355                    }
356    
357                    indexAccessor.deleteDocuments(term);
358            }
359    
360            @Override
361            public void dumpIndex(long companyId, OutputStream outputStream)
362                    throws IOException {
363    
364                    long lastGeneration = getLastGeneration(companyId);
365    
366                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
367                            if (_log.isDebugEnabled()) {
368                                    _log.debug(
369                                            "Dump index from cluster is not enabled for " + companyId);
370                            }
371    
372                            return;
373                    }
374    
375                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
376    
377                    if (indexAccessor == null) {
378                            return;
379                    }
380    
381                    indexAccessor.dumpIndex(outputStream);
382            }
383    
384            @Override
385            public Analyzer getAnalyzer() {
386                    return _analyzer;
387            }
388    
389            @Override
390            public IndexAccessor getIndexAccessor(long companyId) {
391                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
392    
393                    if (indexAccessor != null) {
394                            return indexAccessor;
395                    }
396    
397                    synchronized (this) {
398                            indexAccessor = _indexAccessors.get(companyId);
399    
400                            if (indexAccessor == null) {
401                                    indexAccessor = new IndexAccessorImpl(companyId);
402    
403                                    if (isLoadIndexFromClusterEnabled()) {
404                                            indexAccessor = new SynchronizedIndexAccessorImpl(
405                                                    indexAccessor);
406    
407                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
408                                                    MessageValuesThreadLocal.getValue(
409                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
410    
411                                            if (clusterForwardMessage) {
412                                                    if (_log.isInfoEnabled()) {
413                                                            _log.info(
414                                                                    "Skip Luncene index files cluster loading " +
415                                                                            "since this is a manual reindex request");
416                                                    }
417                                            }
418                                            else {
419                                                    try {
420                                                            _loadIndexFromCluster(
421                                                                    indexAccessor,
422                                                                    indexAccessor.getLastGeneration());
423                                                    }
424                                                    catch (Exception e) {
425                                                            _log.error(
426                                                                    "Unable to load index for company " +
427                                                                            indexAccessor.getCompanyId(),
428                                                                    e);
429                                                    }
430                                            }
431                                    }
432    
433                                    _indexAccessors.put(companyId, indexAccessor);
434                            }
435                    }
436    
437                    return indexAccessor;
438            }
439    
440            @Override
441            public long getLastGeneration(long companyId) {
442                    if (!isLoadIndexFromClusterEnabled()) {
443                            return IndexAccessor.DEFAULT_LAST_GENERATION;
444                    }
445    
446                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
447    
448                    if (indexAccessor == null) {
449                            return IndexAccessor.DEFAULT_LAST_GENERATION;
450                    }
451    
452                    return indexAccessor.getLastGeneration();
453            }
454    
455            @Override
456            public InputStream getLoadIndexesInputStreamFromCluster(
457                            long companyId, Address bootupAddress)
458                    throws SystemException {
459    
460                    if (!isLoadIndexFromClusterEnabled()) {
461                            return null;
462                    }
463    
464                    InputStream inputStream = null;
465    
466                    try {
467                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
468                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
469    
470                            URL url = bootupClusterNodeObjectValuePair.getValue();
471    
472                            URLConnection urlConnection = url.openConnection();
473    
474                            urlConnection.setDoOutput(true);
475    
476                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
477                                    urlConnection.getOutputStream());
478    
479                            unsyncPrintWriter.write("transientToken=");
480                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
481                            unsyncPrintWriter.write("&companyId=");
482                            unsyncPrintWriter.write(String.valueOf(companyId));
483    
484                            unsyncPrintWriter.close();
485    
486                            inputStream = urlConnection.getInputStream();
487    
488                            return inputStream;
489                    }
490                    catch (IOException ioe) {
491                            throw new SystemException(ioe);
492                    }
493            }
494    
495            @Override
496            public Set<String> getQueryTerms(Query query) {
497                    String queryString = StringUtil.replace(
498                            query.toString(), StringPool.STAR, StringPool.BLANK);
499    
500                    Query tempQuery = null;
501    
502                    try {
503                            QueryParser queryParser = new QueryParser(
504                                    getVersion(), StringPool.BLANK, getAnalyzer());
505    
506                            tempQuery = queryParser.parse(queryString);
507                    }
508                    catch (Exception e) {
509                            if (_log.isWarnEnabled()) {
510                                    _log.warn("Unable to parse " + queryString);
511                            }
512    
513                            tempQuery = query;
514                    }
515    
516                    WeightedTerm[] weightedTerms = null;
517    
518                    for (String fieldName : Field.KEYWORDS) {
519                            weightedTerms = QueryTermExtractor.getTerms(
520                                    tempQuery, false, fieldName);
521    
522                            if (weightedTerms.length > 0) {
523                                    break;
524                            }
525                    }
526    
527                    Set<String> queryTerms = new HashSet<String>();
528    
529                    for (WeightedTerm weightedTerm : weightedTerms) {
530                            queryTerms.add(weightedTerm.getTerm());
531                    }
532    
533                    return queryTerms;
534            }
535    
536            @Override
537            public IndexSearcher getSearcher(long companyId, boolean readOnly)
538                    throws IOException {
539    
540                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
541    
542                    IndexReader indexReader = IndexReader.open(
543                            indexAccessor.getLuceneDir(), readOnly);
544    
545                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
546    
547                    indexSearcher.setDefaultFieldSortScoring(true, true);
548                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
549    
550                    return indexSearcher;
551            }
552    
553            @Override
554            public String getSnippet(
555                            Query query, String field, String s, int maxNumFragments,
556                            int fragmentLength, String fragmentSuffix, Formatter formatter)
557                    throws IOException {
558    
559                    QueryScorer queryScorer = new QueryScorer(query, field);
560    
561                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
562    
563                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
564    
565                    TokenStream tokenStream = getAnalyzer().tokenStream(
566                            field, new UnsyncStringReader(s));
567    
568                    try {
569                            String snippet = highlighter.getBestFragments(
570                                    tokenStream, s, maxNumFragments, fragmentSuffix);
571    
572                            if (Validator.isNotNull(snippet) &&
573                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
574                                    !s.equals(snippet)) {
575    
576                                    snippet = snippet.concat(fragmentSuffix);
577                            }
578    
579                            return snippet;
580                    }
581                    catch (InvalidTokenOffsetsException itoe) {
582                            throw new IOException(itoe.getMessage());
583                    }
584            }
585    
586            @Override
587            public Version getVersion() {
588                    return _version;
589            }
590    
591            @Override
592            public boolean isLoadIndexFromClusterEnabled() {
593                    if (PropsValues.CLUSTER_LINK_ENABLED &&
594                            PropsValues.LUCENE_REPLICATE_WRITE) {
595    
596                            return true;
597                    }
598    
599                    if (_log.isDebugEnabled()) {
600                            _log.debug("Load index from cluster is not enabled");
601                    }
602    
603                    return false;
604            }
605    
606            @Override
607            public void loadIndex(long companyId, InputStream inputStream)
608                    throws IOException {
609    
610                    if (!isLoadIndexFromClusterEnabled()) {
611                            return;
612                    }
613    
614                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
615    
616                    if (indexAccessor == null) {
617                            if (_log.isInfoEnabled()) {
618                                    _log.info(
619                                            "Skip loading Lucene index files for company " + companyId +
620                                                    " in favor of lazy loading");
621                            }
622    
623                            return;
624                    }
625    
626                    StopWatch stopWatch = null;
627    
628                    if (_log.isInfoEnabled()) {
629                            _log.info(
630                                    "Start loading Lucene index files for company " + companyId);
631    
632                            stopWatch = new StopWatch();
633    
634                            stopWatch.start();
635                    }
636    
637                    indexAccessor.loadIndex(inputStream);
638    
639                    if (_log.isInfoEnabled()) {
640                            _log.info(
641                                    "Finished loading index files for company " + companyId +
642                                            " in " + stopWatch.getTime() + " ms");
643                    }
644            }
645    
646            @Override
647            public void loadIndexesFromCluster(long companyId) throws SystemException {
648                    if (!isLoadIndexFromClusterEnabled()) {
649                            return;
650                    }
651    
652                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
653    
654                    if (indexAccessor == null) {
655                            return;
656                    }
657    
658                    long localLastGeneration = getLastGeneration(companyId);
659    
660                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
661            }
662    
663            public void setAnalyzer(Analyzer analyzer) {
664                    _analyzer = analyzer;
665            }
666    
667            public void setVersion(Version version) {
668                    _version = version;
669            }
670    
671            @Override
672            public void shutdown() {
673                    if (_luceneIndexThreadPoolExecutor != null) {
674                            _luceneIndexThreadPoolExecutor.shutdownNow();
675    
676                            try {
677                                    _luceneIndexThreadPoolExecutor.awaitTermination(
678                                            60, TimeUnit.SECONDS);
679                            }
680                            catch (InterruptedException ie) {
681                                    _log.error("Lucene indexer shutdown interrupted", ie);
682                            }
683                    }
684    
685                    if (isLoadIndexFromClusterEnabled()) {
686                            ClusterExecutorUtil.removeClusterEventListener(
687                                    _loadIndexClusterEventListener);
688                    }
689    
690                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
691                            indexAccessor.close();
692                    }
693            }
694    
695            @Override
696            public void shutdown(long companyId) {
697                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
698    
699                    _indexAccessors.remove(indexAccessor);
700    
701                    indexAccessor.close();
702            }
703    
704            @Override
705            public void startup(long companyId) {
706                    if (!PropsValues.INDEX_ON_STARTUP) {
707                            return;
708                    }
709    
710                    if (_log.isInfoEnabled()) {
711                            _log.info("Indexing Lucene on startup");
712                    }
713    
714                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
715    
716                    if (PropsValues.INDEX_WITH_THREAD) {
717                            if (_luceneIndexThreadPoolExecutor == null) {
718    
719                                    // This should never be null except for the case where
720                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
721                                    // PropsValues#INDEX_ON_STARTUP to true.
722    
723                                    _luceneIndexThreadPoolExecutor =
724                                            PortalExecutorManagerUtil.getPortalExecutor(
725                                                    LuceneHelperImpl.class.getName());
726                            }
727    
728                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
729                    }
730                    else {
731                            luceneIndexer.reindex();
732                    }
733            }
734    
735            @Override
736            public void updateDocument(long companyId, Term term, Document document)
737                    throws IOException {
738    
739                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
740    
741                    indexAccessor.updateDocument(term, document);
742            }
743    
744            private LuceneHelperImpl() {
745                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
746                            _luceneIndexThreadPoolExecutor =
747                                    PortalExecutorManagerUtil.getPortalExecutor(
748                                            LuceneHelperImpl.class.getName());
749                    }
750    
751                    if (isLoadIndexFromClusterEnabled()) {
752                            _loadIndexClusterEventListener =
753                                    new LoadIndexClusterEventListener();
754    
755                            ClusterExecutorUtil.addClusterEventListener(
756                                    _loadIndexClusterEventListener);
757                    }
758    
759                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
760            }
761    
762            private ObjectValuePair<String, URL>
763                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
764                    throws SystemException {
765    
766                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
767                            new MethodHandler(
768                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
769                            bootupAddress);
770    
771                    FutureClusterResponses futureClusterResponses =
772                            ClusterExecutorUtil.execute(clusterRequest);
773    
774                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
775                            futureClusterResponses.getPartialResults();
776    
777                    try {
778                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
779                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
780                                    TimeUnit.MILLISECONDS);
781    
782                            String transientToken = (String)clusterNodeResponse.getResult();
783    
784                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
785    
786                            InetAddress inetAddress = clusterNode.getInetAddress();
787    
788                            String fileName = PortalUtil.getPathContext();
789    
790                            if (!fileName.endsWith(StringPool.SLASH)) {
791                                    fileName = fileName.concat(StringPool.SLASH);
792                            }
793    
794                            fileName = fileName.concat("lucene/dump");
795    
796                            URL url = new URL(
797                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
798                                    fileName);
799    
800                            return new ObjectValuePair<String, URL>(transientToken, url);
801                    }
802                    catch (Exception e) {
803                            throw new SystemException(e);
804                    }
805            }
806    
807            private void _includeIfUnique(
808                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
809                    boolean like) {
810    
811                    if (query instanceof TermQuery) {
812                            Set<Term> terms = new HashSet<Term>();
813    
814                            TermQuery termQuery = (TermQuery)query;
815    
816                            termQuery.extractTerms(terms);
817    
818                            float boost = termQuery.getBoost();
819    
820                            for (Term term : terms) {
821                                    String termValue = term.text();
822    
823                                    if (like) {
824                                            term = term.createTerm(
825                                                    StringPool.STAR.concat(termValue).concat(
826                                                            StringPool.STAR));
827    
828                                            query = new WildcardQuery(term);
829                                    }
830                                    else {
831                                            query = new TermQuery(term);
832                                    }
833    
834                                    query.setBoost(boost);
835    
836                                    boolean included = false;
837    
838                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
839                                            if (query.equals(booleanClause.getQuery())) {
840                                                    included = true;
841                                            }
842                                    }
843    
844                                    if (!included) {
845                                            booleanQuery.add(query, occur);
846                                    }
847                            }
848                    }
849                    else if (query instanceof BooleanQuery) {
850                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
851    
852                            BooleanQuery containerBooleanQuery = new BooleanQuery();
853    
854                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
855                                    _includeIfUnique(
856                                            containerBooleanQuery, booleanClause.getQuery(),
857                                            booleanClause.getOccur(), like);
858                            }
859    
860                            if (containerBooleanQuery.getClauses().length > 0) {
861                                    booleanQuery.add(containerBooleanQuery, occur);
862                            }
863                    }
864                    else {
865                            boolean included = false;
866    
867                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
868                                    if (query.equals(booleanClause.getQuery())) {
869                                            included = true;
870                                    }
871                            }
872    
873                            if (!included) {
874                                    booleanQuery.add(query, occur);
875                            }
876                    }
877            }
878    
879            private void _loadIndexFromCluster(
880                            IndexAccessor indexAccessor, long localLastGeneration)
881                    throws SystemException {
882    
883                    List<Address> clusterNodeAddresses =
884                            ClusterExecutorUtil.getClusterNodeAddresses();
885    
886                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
887    
888                    if (clusterNodeAddressesCount <= 1) {
889                            if (_log.isDebugEnabled()) {
890                                    _log.debug(
891                                            "Do not load indexes because there is either one portal " +
892                                                    "instance or no portal instances in the cluster");
893                            }
894    
895                            return;
896                    }
897    
898                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
899                            new MethodHandler(
900                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
901                            true);
902    
903                    ClusterExecutorUtil.execute(
904                            clusterRequest,
905                            new LoadIndexClusterResponseCallback(
906                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
907            }
908    
909            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
910                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
911    
912            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
913                    GetterUtil.getInteger(
914                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
915                            BooleanQuery.getMaxClauseCount());
916    
917            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
918    
919            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
920    
921            private static MethodKey _createTokenMethodKey = new MethodKey(
922                    TransientTokenUtil.class, "createToken", long.class);
923            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
924                    LuceneHelperUtil.class, "getLastGeneration", long.class);
925    
926            private Analyzer _analyzer;
927            private Map<Long, IndexAccessor> _indexAccessors =
928                    new ConcurrentHashMap<Long, IndexAccessor>();
929            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
930            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
931            private Version _version;
932    
933            private class LoadIndexClusterEventListener
934                    implements ClusterEventListener {
935    
936                    @Override
937                    public void processClusterEvent(ClusterEvent clusterEvent) {
938                            ClusterEventType clusterEventType =
939                                    clusterEvent.getClusterEventType();
940    
941                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
942                                    return;
943                            }
944    
945                            List<Address> clusterNodeAddresses =
946                                    ClusterExecutorUtil.getClusterNodeAddresses();
947                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
948    
949                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
950                                    if (_log.isDebugEnabled()) {
951                                            _log.debug(
952                                                    "Number of original cluster members is greater than " +
953                                                            "one");
954                                    }
955    
956                                    return;
957                            }
958    
959                            long[] companyIds = PortalInstances.getCompanyIds();
960    
961                            for (long companyId : companyIds) {
962                                    loadIndexes(companyId);
963                            }
964    
965                            loadIndexes(CompanyConstants.SYSTEM);
966                    }
967    
968                    private void loadIndexes(long companyId) {
969                            long lastGeneration = getLastGeneration(companyId);
970    
971                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
972                                    return;
973                            }
974    
975                            try {
976                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
977                            }
978                            catch (Exception e) {
979                                    _log.error(
980                                            "Unable to load indexes for company " + companyId, e);
981                            }
982                    }
983    
984            }
985    
986            private class LoadIndexClusterResponseCallback
987                    extends BaseClusterResponseCallback {
988    
989                    public LoadIndexClusterResponseCallback(
990                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
991                            long localLastGeneration) {
992    
993                            _indexAccessor = indexAccessor;
994                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
995                            _localLastGeneration = localLastGeneration;
996    
997                            _companyId = _indexAccessor.getCompanyId();
998                    }
999    
1000                    @Override
1001                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
1002                            Address bootupAddress = null;
1003    
1004                            do {
1005                                    _clusterNodeAddressesCount--;
1006    
1007                                    ClusterNodeResponse clusterNodeResponse = null;
1008    
1009                                    try {
1010                                            clusterNodeResponse = blockingQueue.poll(
1011                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
1012                                                    TimeUnit.MILLISECONDS);
1013                                    }
1014                                    catch (Exception e) {
1015                                            _log.error("Unable to get cluster node response", e);
1016                                    }
1017    
1018                                    if (clusterNodeResponse == null) {
1019                                            if (_log.isDebugEnabled()) {
1020                                                    _log.debug(
1021                                                            "Unable to get cluster node response in " +
1022                                                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
1023                                                                            TimeUnit.MILLISECONDS);
1024                                            }
1025    
1026                                            continue;
1027                                    }
1028    
1029                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
1030    
1031                                    if (clusterNode.getPort() > 0) {
1032                                            try {
1033                                                    long remoteLastGeneration =
1034                                                            (Long)clusterNodeResponse.getResult();
1035    
1036                                                    if (remoteLastGeneration > _localLastGeneration) {
1037                                                            bootupAddress = clusterNodeResponse.getAddress();
1038    
1039                                                            break;
1040                                                    }
1041                                            }
1042                                            catch (Exception e) {
1043                                                    if (_log.isDebugEnabled()) {
1044                                                            _log.debug(
1045                                                                    "Suppress exception caused by remote method " +
1046                                                                            "invocation",
1047                                                                    e);
1048                                                    }
1049    
1050                                                    continue;
1051                                            }
1052                                    }
1053                                    else {
1054                                            if (_log.isDebugEnabled()) {
1055                                                    _log.debug(
1056                                                            "Cluster node " + clusterNode +
1057                                                                    " has invalid port");
1058                                            }
1059                                    }
1060                            }
1061                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
1062    
1063                            if (bootupAddress == null) {
1064                                    return;
1065                            }
1066    
1067                            if (_log.isInfoEnabled()) {
1068                                    _log.info(
1069                                            "Start loading lucene index files from cluster node " +
1070                                                    bootupAddress);
1071                            }
1072    
1073                            InputStream inputStream = null;
1074    
1075                            try {
1076                                    inputStream = getLoadIndexesInputStreamFromCluster(
1077                                            _companyId, bootupAddress);
1078    
1079                                    _indexAccessor.loadIndex(inputStream);
1080    
1081                                    if (_log.isInfoEnabled()) {
1082                                            _log.info("Lucene index files loaded successfully");
1083                                    }
1084                            }
1085                            catch (Exception e) {
1086                                    _log.error("Unable to load index for company " + _companyId, e);
1087                            }
1088                            finally {
1089                                    if (inputStream != null) {
1090                                            try {
1091                                                    inputStream.close();
1092                                            }
1093                                            catch (IOException ioe) {
1094                                                    _log.error(
1095                                                            "Unable to close input stream for company " +
1096                                                                    _companyId,
1097                                                            ioe);
1098                                            }
1099                                    }
1100                            }
1101                    }
1102    
1103                    @Override
1104                    public void processTimeoutException(TimeoutException timeoutException) {
1105                            _log.error(
1106                                    "Unable to load index for company " + _companyId,
1107                                    timeoutException);
1108                    }
1109    
1110                    private int _clusterNodeAddressesCount;
1111                    private long _companyId;
1112                    private IndexAccessor _indexAccessor;
1113                    private long _localLastGeneration;
1114    
1115            }
1116    
1117    }