001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterEventType;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNode;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
032    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
036    import com.liferay.portal.kernel.search.BooleanClauseOccur;
037    import com.liferay.portal.kernel.search.Field;
038    import com.liferay.portal.kernel.util.ArrayUtil;
039    import com.liferay.portal.kernel.util.GetterUtil;
040    import com.liferay.portal.kernel.util.MethodHandler;
041    import com.liferay.portal.kernel.util.MethodKey;
042    import com.liferay.portal.kernel.util.ObjectValuePair;
043    import com.liferay.portal.kernel.util.StringPool;
044    import com.liferay.portal.kernel.util.StringUtil;
045    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
046    import com.liferay.portal.kernel.util.Validator;
047    import com.liferay.portal.model.CompanyConstants;
048    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
049    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
050    import com.liferay.portal.security.auth.TransientTokenUtil;
051    import com.liferay.portal.util.PortalInstances;
052    import com.liferay.portal.util.PropsValues;
053    import com.liferay.util.lucene.KeywordsUtil;
054    
055    import java.io.IOException;
056    import java.io.InputStream;
057    import java.io.OutputStream;
058    
059    import java.net.InetAddress;
060    import java.net.URL;
061    import java.net.URLConnection;
062    
063    import java.util.HashSet;
064    import java.util.List;
065    import java.util.Map;
066    import java.util.Set;
067    import java.util.concurrent.BlockingQueue;
068    import java.util.concurrent.ConcurrentHashMap;
069    import java.util.concurrent.TimeUnit;
070    import java.util.concurrent.TimeoutException;
071    
072    import org.apache.commons.lang.time.StopWatch;
073    import org.apache.lucene.analysis.Analyzer;
074    import org.apache.lucene.analysis.TokenStream;
075    import org.apache.lucene.document.Document;
076    import org.apache.lucene.index.IndexReader;
077    import org.apache.lucene.index.Term;
078    import org.apache.lucene.queryParser.QueryParser;
079    import org.apache.lucene.search.BooleanClause;
080    import org.apache.lucene.search.BooleanQuery;
081    import org.apache.lucene.search.IndexSearcher;
082    import org.apache.lucene.search.NumericRangeQuery;
083    import org.apache.lucene.search.Query;
084    import org.apache.lucene.search.TermQuery;
085    import org.apache.lucene.search.TermRangeQuery;
086    import org.apache.lucene.search.WildcardQuery;
087    import org.apache.lucene.search.highlight.Formatter;
088    import org.apache.lucene.search.highlight.Highlighter;
089    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
090    import org.apache.lucene.search.highlight.QueryScorer;
091    import org.apache.lucene.search.highlight.SimpleFragmenter;
092    import org.apache.lucene.search.highlight.WeightedTerm;
093    import org.apache.lucene.util.Version;
094    
095    /**
096     * @author Brian Wing Shun Chan
097     * @author Harry Mark
098     * @author Bruno Farache
099     * @author Shuyang Zhou
100     * @author Tina Tian
101     * @author Hugo Huijser
102     */
103    public class LuceneHelperImpl implements LuceneHelper {
104    
105            public void addDocument(long companyId, Document document)
106                    throws IOException {
107    
108                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
109    
110                    indexAccessor.addDocument(document);
111            }
112    
113            public void addExactTerm(
114                    BooleanQuery booleanQuery, String field, String value) {
115    
116                    addTerm(booleanQuery, field, value, false);
117            }
118    
119            public void addNumericRangeTerm(
120                    BooleanQuery booleanQuery, String field, String startValue,
121                    String endValue) {
122    
123                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
124                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
125                            true, true);
126    
127                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
128            }
129    
130            public void addRangeTerm(
131                    BooleanQuery booleanQuery, String field, String startValue,
132                    String endValue) {
133    
134                    boolean includesLower = true;
135    
136                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
137                            includesLower = false;
138                    }
139    
140                    boolean includesUpper = true;
141    
142                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
143                            includesUpper = false;
144                    }
145    
146                    TermRangeQuery termRangeQuery = new TermRangeQuery(
147                            field, startValue, endValue, includesLower, includesUpper);
148    
149                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
150            }
151    
152            public void addRequiredTerm(
153                    BooleanQuery booleanQuery, String field, String value, boolean like) {
154    
155                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
156            }
157    
158            public void addRequiredTerm(
159                    BooleanQuery booleanQuery, String field, String[] values,
160                    boolean like) {
161    
162                    if (values == null) {
163                            return;
164                    }
165    
166                    BooleanQuery query = new BooleanQuery();
167    
168                    for (String value : values) {
169                            addTerm(query, field, value, like);
170                    }
171    
172                    booleanQuery.add(query, BooleanClause.Occur.MUST);
173            }
174    
175            public void addTerm(
176                    BooleanQuery booleanQuery, String field, String value, boolean like) {
177    
178                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
179            }
180    
181            public void addTerm(
182                    BooleanQuery booleanQuery, String field, String value, boolean like,
183                    BooleanClauseOccur booleanClauseOccur) {
184    
185                    if (Validator.isNull(value)) {
186                            return;
187                    }
188    
189                    Analyzer analyzer = getAnalyzer();
190    
191                    if (analyzer instanceof PerFieldAnalyzer) {
192                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
193    
194                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
195    
196                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
197                                    like = true;
198                            }
199                    }
200    
201                    if (like) {
202                            value = StringUtil.replace(
203                                    value, StringPool.PERCENT, StringPool.BLANK);
204                    }
205    
206                    try {
207                            QueryParser queryParser = new QueryParser(
208                                    getVersion(), field, analyzer);
209    
210                            Query query = null;
211    
212                            try {
213                                    if (like) {
214    
215                                            // LUCENE-89
216    
217                                            value = value.toLowerCase(queryParser.getLocale());
218                                    }
219    
220                                    query = queryParser.parse(value);
221                            }
222                            catch (Exception e) {
223                                    query = queryParser.parse(KeywordsUtil.escape(value));
224                            }
225    
226                            BooleanClause.Occur occur = null;
227    
228                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
229                                    occur = BooleanClause.Occur.MUST;
230                            }
231                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
232                                    occur = BooleanClause.Occur.MUST_NOT;
233                            }
234                            else {
235                                    occur = BooleanClause.Occur.SHOULD;
236                            }
237    
238                            _includeIfUnique(booleanQuery, query, occur, like);
239                    }
240                    catch (Exception e) {
241                            _log.error(e, e);
242                    }
243            }
244    
245            public void addTerm(
246                    BooleanQuery booleanQuery, String field, String[] values,
247                    boolean like) {
248    
249                    for (String value : values) {
250                            addTerm(booleanQuery, field, value, like);
251                    }
252            }
253    
254            public int countScoredFieldNames(Query query, String[] filedNames) {
255                    int count = 0;
256    
257                    for (String fieldName : filedNames) {
258                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
259                                    query, false, fieldName);
260    
261                            if ((weightedTerms.length > 0) &&
262                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
263    
264                                    count++;
265                            }
266                    }
267    
268                    return count;
269            }
270    
271            public void delete(long companyId) {
272                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
273    
274                    if (indexAccessor == null) {
275                            return;
276                    }
277    
278                    indexAccessor.delete();
279            }
280    
281            public void deleteDocuments(long companyId, Term term) throws IOException {
282                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
283    
284                    if (indexAccessor == null) {
285                            return;
286                    }
287    
288                    indexAccessor.deleteDocuments(term);
289            }
290    
291            public void dumpIndex(long companyId, OutputStream outputStream)
292                    throws IOException {
293    
294                    long lastGeneration = getLastGeneration(companyId);
295    
296                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
297                            if (_log.isDebugEnabled()) {
298                                    _log.debug(
299                                            "Dump index from cluster is not enabled for " + companyId);
300                            }
301    
302                            return;
303                    }
304    
305                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
306    
307                    if (indexAccessor == null) {
308                            return;
309                    }
310    
311                    indexAccessor.dumpIndex(outputStream);
312            }
313    
314            public Analyzer getAnalyzer() {
315                    return _analyzer;
316            }
317    
318            public long getLastGeneration(long companyId) {
319                    if (!isLoadIndexFromClusterEnabled()) {
320                            return IndexAccessor.DEFAULT_LAST_GENERATION;
321                    }
322    
323                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
324    
325                    if (indexAccessor == null) {
326                            return IndexAccessor.DEFAULT_LAST_GENERATION;
327                    }
328    
329                    return indexAccessor.getLastGeneration();
330            }
331    
332            public InputStream getLoadIndexesInputStreamFromCluster(
333                            long companyId, Address bootupAddress)
334                    throws SystemException {
335    
336                    if (!isLoadIndexFromClusterEnabled()) {
337                            return null;
338                    }
339    
340                    InputStream inputStream = null;
341    
342                    try {
343                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
344                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
345    
346                            URL url = bootupClusterNodeObjectValuePair.getValue();
347    
348                            URLConnection urlConnection = url.openConnection();
349    
350                            urlConnection.setDoOutput(true);
351    
352                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
353                                    urlConnection.getOutputStream());
354    
355                            unsyncPrintWriter.write("transientToken=");
356                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
357                            unsyncPrintWriter.write("&companyId=");
358                            unsyncPrintWriter.write(String.valueOf(companyId));
359    
360                            unsyncPrintWriter.close();
361    
362                            inputStream = urlConnection.getInputStream();
363    
364                            return inputStream;
365                    }
366                    catch (IOException ioe) {
367                            throw new SystemException(ioe);
368                    }
369            }
370    
371            public Set<String> getQueryTerms(Query query) {
372                    String queryString = StringUtil.replace(
373                            query.toString(), StringPool.STAR, StringPool.BLANK);
374    
375                    Query tempQuery = null;
376    
377                    try {
378                            QueryParser queryParser = new QueryParser(
379                                    getVersion(), StringPool.BLANK, getAnalyzer());
380    
381                            tempQuery = queryParser.parse(queryString);
382                    }
383                    catch (Exception e) {
384                            if (_log.isWarnEnabled()) {
385                                    _log.warn("Unable to parse " + queryString);
386                            }
387    
388                            tempQuery = query;
389                    }
390    
391                    WeightedTerm[] weightedTerms = null;
392    
393                    for (String fieldName : Field.KEYWORDS) {
394                            weightedTerms = QueryTermExtractor.getTerms(
395                                    tempQuery, false, fieldName);
396    
397                            if (weightedTerms.length > 0) {
398                                    break;
399                            }
400                    }
401    
402                    Set<String> queryTerms = new HashSet<String>();
403    
404                    for (WeightedTerm weightedTerm : weightedTerms) {
405                            queryTerms.add(weightedTerm.getTerm());
406                    }
407    
408                    return queryTerms;
409            }
410    
411            public IndexSearcher getSearcher(long companyId, boolean readOnly)
412                    throws IOException {
413    
414                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
415    
416                    IndexReader indexReader = IndexReader.open(
417                            indexAccessor.getLuceneDir(), readOnly);
418    
419                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
420    
421                    indexSearcher.setDefaultFieldSortScoring(true, true);
422                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
423    
424                    return indexSearcher;
425            }
426    
427            public String getSnippet(
428                            Query query, String field, String s, int maxNumFragments,
429                            int fragmentLength, String fragmentSuffix, Formatter formatter)
430                    throws IOException {
431    
432                    QueryScorer queryScorer = new QueryScorer(query, field);
433    
434                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
435    
436                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
437    
438                    TokenStream tokenStream = getAnalyzer().tokenStream(
439                            field, new UnsyncStringReader(s));
440    
441                    try {
442                            String snippet = highlighter.getBestFragments(
443                                    tokenStream, s, maxNumFragments, fragmentSuffix);
444    
445                            if (Validator.isNotNull(snippet) &&
446                                    !StringUtil.endsWith(snippet, fragmentSuffix)) {
447    
448                                    snippet = snippet.concat(fragmentSuffix);
449                            }
450    
451                            return snippet;
452                    }
453                    catch (InvalidTokenOffsetsException itoe) {
454                            throw new IOException(itoe.getMessage());
455                    }
456            }
457    
458            public Version getVersion() {
459                    return _version;
460            }
461    
462            public boolean isLoadIndexFromClusterEnabled() {
463                    if (PropsValues.CLUSTER_LINK_ENABLED &&
464                            PropsValues.LUCENE_REPLICATE_WRITE) {
465    
466                            return true;
467                    }
468    
469                    if (_log.isDebugEnabled()) {
470                            _log.debug("Load index from cluster is not enabled");
471                    }
472    
473                    return false;
474            }
475    
476            public void loadIndex(long companyId, InputStream inputStream)
477                    throws IOException {
478    
479                    if (!isLoadIndexFromClusterEnabled()) {
480                            return;
481                    }
482    
483                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
484    
485                    if (indexAccessor == null) {
486                            if (_log.isInfoEnabled()) {
487                                    _log.info(
488                                            "Skip loading Lucene index files for company " + companyId +
489                                                    " in favor of lazy loading");
490                            }
491    
492                            return;
493                    }
494    
495                    StopWatch stopWatch = null;
496    
497                    if (_log.isInfoEnabled()) {
498                            _log.info(
499                                    "Start loading Lucene index files for company " + companyId);
500    
501                            stopWatch = new StopWatch();
502    
503                            stopWatch.start();
504                    }
505    
506                    indexAccessor.loadIndex(inputStream);
507    
508                    if (_log.isInfoEnabled()) {
509                            _log.info(
510                                    "Finished loading index files for company " + companyId +
511                                            " in " + stopWatch.getTime() + " ms");
512                    }
513            }
514    
515            public void loadIndexesFromCluster(long companyId) throws SystemException {
516                    if (!isLoadIndexFromClusterEnabled()) {
517                            return;
518                    }
519    
520                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
521    
522                    if (indexAccessor == null) {
523                            return;
524                    }
525    
526                    long localLastGeneration = getLastGeneration(companyId);
527    
528                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
529            }
530    
531            public void setAnalyzer(Analyzer analyzer) {
532                    _analyzer = analyzer;
533            }
534    
535            public void setVersion(Version version) {
536                    _version = version;
537            }
538    
539            public void shutdown() {
540                    if (_luceneIndexThreadPoolExecutor != null) {
541                            _luceneIndexThreadPoolExecutor.shutdownNow();
542    
543                            try {
544                                    _luceneIndexThreadPoolExecutor.awaitTermination(
545                                            60, TimeUnit.SECONDS);
546                            }
547                            catch (InterruptedException ie) {
548                                    _log.error("Lucene indexer shutdown interrupted", ie);
549                            }
550                    }
551    
552                    if (isLoadIndexFromClusterEnabled()) {
553                            ClusterExecutorUtil.removeClusterEventListener(
554                                    _loadIndexClusterEventListener);
555                    }
556    
557                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
558                            indexAccessor.close();
559                    }
560            }
561    
562            public void startup(long companyId) {
563                    if (PropsValues.INDEX_ON_STARTUP) {
564                            if (_log.isInfoEnabled()) {
565                                    _log.info("Indexing Lucene on startup");
566                            }
567    
568                            LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
569    
570                            if (PropsValues.INDEX_WITH_THREAD) {
571                                    if (_luceneIndexThreadPoolExecutor == null) {
572    
573                                            // This should never be null except for the case where
574                                            // VerifyProcessUtil#_verifyProcess(boolean) sets
575                                            // PropsValues#INDEX_ON_STARTUP to true.
576    
577                                            _luceneIndexThreadPoolExecutor =
578                                                    PortalExecutorManagerUtil.getPortalExecutor(
579                                                            LuceneHelperImpl.class.getName());
580                                    }
581    
582                                    _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
583                            }
584                            else {
585                                    luceneIndexer.reindex();
586                            }
587                    }
588            }
589    
590            public void updateDocument(long companyId, Term term, Document document)
591                    throws IOException {
592    
593                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
594    
595                    indexAccessor.updateDocument(term, document);
596            }
597    
598            private LuceneHelperImpl() {
599                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
600                            _luceneIndexThreadPoolExecutor =
601                                    PortalExecutorManagerUtil.getPortalExecutor(
602                                            LuceneHelperImpl.class.getName());
603                    }
604    
605                    if (isLoadIndexFromClusterEnabled()) {
606                            _loadIndexClusterEventListener =
607                                    new LoadIndexClusterEventListener();
608    
609                            ClusterExecutorUtil.addClusterEventListener(
610                                    _loadIndexClusterEventListener);
611                    }
612            }
613    
614            private ObjectValuePair<String, URL>
615                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
616                    throws SystemException {
617    
618                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
619                            new MethodHandler(
620                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
621                            bootupAddress);
622    
623                    FutureClusterResponses futureClusterResponses =
624                            ClusterExecutorUtil.execute(clusterRequest);
625    
626                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
627                            futureClusterResponses.getPartialResults();
628    
629                    try {
630                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
631                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
632    
633                            String transientToken = (String)clusterNodeResponse.getResult();
634    
635                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
636    
637                            InetAddress inetAddress = clusterNode.getInetAddress();
638    
639                            URL url = new URL(
640                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
641                                    "/lucene/dump");
642    
643                            return new ObjectValuePair<String, URL>(transientToken, url);
644                    }
645                    catch (Exception e) {
646                            throw new SystemException(e);
647                    }
648            }
649    
650            private IndexAccessor _getIndexAccessor(long companyId) {
651                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
652    
653                    if (indexAccessor != null) {
654                            return indexAccessor;
655                    }
656    
657                    synchronized (this) {
658                            indexAccessor = _indexAccessors.get(companyId);
659    
660                            if (indexAccessor == null) {
661                                    indexAccessor = new IndexAccessorImpl(companyId);
662    
663                                    if (isLoadIndexFromClusterEnabled()) {
664                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
665                                                    MessageValuesThreadLocal.getValue(
666                                                            ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE));
667    
668                                            if (clusterForwardMessage) {
669                                                    if (_log.isInfoEnabled()) {
670                                                            _log.info(
671                                                                    "Skip Luncene index files cluster loading " +
672                                                                            "since this is a manual reindex request");
673                                                    }
674                                            }
675                                            else {
676                                                    indexAccessor = new SynchronizedIndexAccessorImpl(
677                                                            indexAccessor);
678    
679                                                    try {
680                                                            _loadIndexFromCluster(
681                                                                    indexAccessor,
682                                                                    indexAccessor.getLastGeneration());
683                                                    }
684                                                    catch (Exception e) {
685                                                            _log.error(
686                                                                    "Unable to load index for company " +
687                                                                            indexAccessor.getCompanyId(),
688                                                                    e);
689                                                    }
690                                            }
691                                    }
692    
693                                    _indexAccessors.put(companyId, indexAccessor);
694                            }
695                    }
696    
697                    return indexAccessor;
698            }
699    
700            private void _includeIfUnique(
701                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
702                    boolean like) {
703    
704                    if (query instanceof TermQuery) {
705                            Set<Term> terms = new HashSet<Term>();
706    
707                            query.extractTerms(terms);
708    
709                            for (Term term : terms) {
710                                    String termValue = term.text();
711    
712                                    if (like) {
713                                            term = term.createTerm(
714                                                    StringPool.STAR.concat(termValue).concat(
715                                                            StringPool.STAR));
716    
717                                            query = new WildcardQuery(term);
718                                    }
719                                    else {
720                                            query = new TermQuery(term);
721                                    }
722    
723                                    boolean included = false;
724    
725                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
726                                            if (query.equals(booleanClause.getQuery())) {
727                                                    included = true;
728                                            }
729                                    }
730    
731                                    if (!included) {
732                                            booleanQuery.add(query, occur);
733                                    }
734                            }
735                    }
736                    else if (query instanceof BooleanQuery) {
737                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
738    
739                            BooleanQuery containerBooleanQuery = new BooleanQuery();
740    
741                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
742                                    _includeIfUnique(
743                                            containerBooleanQuery, booleanClause.getQuery(),
744                                            booleanClause.getOccur(), like);
745                            }
746    
747                            if (containerBooleanQuery.getClauses().length > 0) {
748                                    booleanQuery.add(containerBooleanQuery, occur);
749                            }
750                    }
751                    else {
752                            boolean included = false;
753    
754                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
755                                    if (query.equals(booleanClause.getQuery())) {
756                                            included = true;
757                                    }
758                            }
759    
760                            if (!included) {
761                                    booleanQuery.add(query, occur);
762                            }
763                    }
764            }
765    
766            private void _loadIndexFromCluster(
767                            IndexAccessor indexAccessor, long localLastGeneration)
768                    throws SystemException {
769    
770                    List<Address> clusterNodeAddresses =
771                            ClusterExecutorUtil.getClusterNodeAddresses();
772    
773                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
774    
775                    if (clusterNodeAddressesCount <= 1) {
776                            if (_log.isDebugEnabled()) {
777                                    _log.debug(
778                                            "Do not load indexes because there is either one portal " +
779                                                    "instance or no portal instances in the cluster");
780                            }
781    
782                            return;
783                    }
784    
785                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
786                            new MethodHandler(
787                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
788                            true);
789    
790                    ClusterExecutorUtil.execute(
791                            clusterRequest,
792                            new LoadIndexClusterResponseCallback(
793                                    indexAccessor, clusterNodeAddressesCount, localLastGeneration));
794            }
795    
796            private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
797    
798            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
799    
800            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
801    
802            private static MethodKey _createTokenMethodKey =
803                    new MethodKey(TransientTokenUtil.class.getName(), "createToken",
804                    long.class);
805            private static MethodKey _getLastGenerationMethodKey =
806                    new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
807                    long.class);
808    
809            private Analyzer _analyzer;
810            private Map<Long, IndexAccessor> _indexAccessors =
811                    new ConcurrentHashMap<Long, IndexAccessor>();
812            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
813            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
814            private Version _version;
815    
816            private class LoadIndexClusterEventListener
817                    implements ClusterEventListener {
818    
819                    public void processClusterEvent(ClusterEvent clusterEvent) {
820                            ClusterEventType clusterEventType =
821                                    clusterEvent.getClusterEventType();
822    
823                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
824                                    return;
825                            }
826    
827                            List<Address> clusterNodeAddresses =
828                                    ClusterExecutorUtil.getClusterNodeAddresses();
829                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
830    
831                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
832                                    if (_log.isDebugEnabled()) {
833                                            _log.debug(
834                                                    "Number of original cluster members is greater than " +
835                                                            "one");
836                                    }
837    
838                                    return;
839                            }
840    
841                            long[] companyIds = PortalInstances.getCompanyIds();
842    
843                            for (long companyId : companyIds) {
844                                    loadIndexes(companyId);
845                            }
846    
847                            loadIndexes(CompanyConstants.SYSTEM);
848                    }
849    
850                    private void loadIndexes(long companyId) {
851                            long lastGeneration = getLastGeneration(companyId);
852    
853                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
854                                    return;
855                            }
856    
857                            try {
858                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
859                            }
860                            catch (Exception e) {
861                                    _log.error(
862                                            "Unable to load indexes for company " + companyId, e);
863                            }
864                    }
865    
866            }
867    
868            private class LoadIndexClusterResponseCallback
869                    extends BaseClusterResponseCallback {
870    
871                    public LoadIndexClusterResponseCallback(
872                            IndexAccessor indexAccessor, int clusterNodeAddressesCount,
873                            long localLastGeneration) {
874    
875                            _indexAccessor = indexAccessor;
876                            _clusterNodeAddressesCount = clusterNodeAddressesCount;
877                            _localLastGeneration = localLastGeneration;
878    
879                            _companyId = _indexAccessor.getCompanyId();
880                    }
881    
882                    @Override
883                    public void callback(BlockingQueue<ClusterNodeResponse> blockingQueue) {
884                            Address bootupAddress = null;
885    
886                            do {
887                                    _clusterNodeAddressesCount--;
888    
889                                    ClusterNodeResponse clusterNodeResponse = null;
890    
891                                    try {
892                                            clusterNodeResponse = blockingQueue.poll(
893                                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
894                                                    TimeUnit.MILLISECONDS);
895                                    }
896                                    catch (Exception e) {
897                                            _log.error("Unable to get cluster node response", e);
898                                    }
899    
900                                    if (clusterNodeResponse == null) {
901                                            if (_log.isDebugEnabled()) {
902                                                    _log.debug(
903                                                            "Unable to get cluster node response in " +
904                                                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
905                                                                            TimeUnit.MILLISECONDS);
906                                            }
907    
908                                            continue;
909                                    }
910    
911                                    ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
912    
913                                    if (clusterNode.getPort() > 0) {
914                                            try {
915                                                    long remoteLastGeneration =
916                                                            (Long)clusterNodeResponse.getResult();
917    
918                                                    if (remoteLastGeneration > _localLastGeneration) {
919                                                            bootupAddress = clusterNodeResponse.getAddress();
920    
921                                                            break;
922                                                    }
923                                            }
924                                            catch (Exception e) {
925                                                    if (_log.isDebugEnabled()) {
926                                                            _log.debug(
927                                                                    "Suppress exception caused by remote method " +
928                                                                            "invocation",
929                                                                    e);
930                                                    }
931    
932                                                    continue;
933                                            }
934                                    }
935                                    else {
936                                            if (_log.isDebugEnabled()) {
937                                                    _log.debug(
938                                                            "Cluster node " + clusterNode +
939                                                                    " has invalid port");
940                                            }
941                                    }
942                            }
943                            while ((bootupAddress == null) && (_clusterNodeAddressesCount > 1));
944    
945                            if (bootupAddress == null) {
946                                    return;
947                            }
948    
949                            if (_log.isInfoEnabled()) {
950                                    _log.info(
951                                            "Start loading lucene index files from cluster node " +
952                                                    bootupAddress);
953                            }
954    
955                            InputStream inputStream = null;
956    
957                            try {
958                                    inputStream = getLoadIndexesInputStreamFromCluster(
959                                            _companyId, bootupAddress);
960    
961                                    _indexAccessor.loadIndex(inputStream);
962    
963                                    if (_log.isInfoEnabled()) {
964                                            _log.info("Lucene index files loaded successfully");
965                                    }
966                            }
967                            catch (Exception e) {
968                                    _log.error("Unable to load index for company " + _companyId, e);
969                            }
970                            finally {
971                                    if (inputStream != null) {
972                                            try {
973                                                    inputStream.close();
974                                            }
975                                            catch (IOException ioe) {
976                                                    _log.error(
977                                                            "Unable to close input stream for company " +
978                                                                    _companyId,
979                                                            ioe);
980                                            }
981                                    }
982                            }
983                    }
984    
985                    @Override
986                    public void processTimeoutException(TimeoutException timeoutException) {
987                            _log.error(
988                                    "Uanble to load index for company " + _companyId,
989                                    timeoutException);
990                    }
991    
992                    private int _clusterNodeAddressesCount;
993                    private long _companyId;
994                    private IndexAccessor _indexAccessor;
995                    private long _localLastGeneration;
996    
997            }
998    
999    }