001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterNode;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
029    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
030    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
034    import com.liferay.portal.kernel.scheduler.SchedulerEntry;
035    import com.liferay.portal.kernel.scheduler.SchedulerEntryImpl;
036    import com.liferay.portal.kernel.scheduler.StorageType;
037    import com.liferay.portal.kernel.scheduler.TimeUnit;
038    import com.liferay.portal.kernel.scheduler.TriggerType;
039    import com.liferay.portal.kernel.search.BooleanClauseOccur;
040    import com.liferay.portal.kernel.search.Field;
041    import com.liferay.portal.kernel.util.ArrayUtil;
042    import com.liferay.portal.kernel.util.GetterUtil;
043    import com.liferay.portal.kernel.util.MethodHandler;
044    import com.liferay.portal.kernel.util.MethodKey;
045    import com.liferay.portal.kernel.util.ObjectValuePair;
046    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
047    import com.liferay.portal.kernel.util.StringPool;
048    import com.liferay.portal.kernel.util.StringUtil;
049    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
050    import com.liferay.portal.kernel.util.Validator;
051    import com.liferay.portal.model.CompanyConstants;
052    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
053    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
054    import com.liferay.portal.search.lucene.messaging.CleanUpMessageListener;
055    import com.liferay.portal.security.auth.TransientTokenUtil;
056    import com.liferay.portal.util.PortalInstances;
057    import com.liferay.portal.util.PropsValues;
058    
059    import java.io.IOException;
060    import java.io.InputStream;
061    import java.io.OutputStream;
062    
063    import java.net.InetAddress;
064    import java.net.URL;
065    import java.net.URLConnection;
066    
067    import java.util.HashSet;
068    import java.util.List;
069    import java.util.Map;
070    import java.util.Set;
071    import java.util.concurrent.BlockingQueue;
072    import java.util.concurrent.ConcurrentHashMap;
073    
074    import org.apache.lucene.analysis.Analyzer;
075    import org.apache.lucene.analysis.TokenStream;
076    import org.apache.lucene.document.Document;
077    import org.apache.lucene.index.Term;
078    import org.apache.lucene.queryParser.QueryParser;
079    import org.apache.lucene.search.BooleanClause;
080    import org.apache.lucene.search.BooleanQuery;
081    import org.apache.lucene.search.IndexSearcher;
082    import org.apache.lucene.search.NumericRangeQuery;
083    import org.apache.lucene.search.Query;
084    import org.apache.lucene.search.TermQuery;
085    import org.apache.lucene.search.TermRangeQuery;
086    import org.apache.lucene.search.WildcardQuery;
087    import org.apache.lucene.search.highlight.Highlighter;
088    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
089    import org.apache.lucene.search.highlight.QueryScorer;
090    import org.apache.lucene.search.highlight.SimpleFragmenter;
091    import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
092    import org.apache.lucene.search.highlight.WeightedTerm;
093    import org.apache.lucene.util.Version;
094    
095    /**
096     * @author Brian Wing Shun Chan
097     * @author Harry Mark
098     * @author Bruno Farache
099     * @author Shuyang Zhou
100     * @author Tina Tian
101     * @author Hugo Huijser
102     */
103    public class LuceneHelperImpl implements LuceneHelper {
104    
105            public void addDocument(long companyId, Document document)
106                    throws IOException {
107    
108                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
109    
110                    indexAccessor.addDocument(document);
111            }
112    
113            public void addExactTerm(
114                    BooleanQuery booleanQuery, String field, String value) {
115    
116                    addTerm(booleanQuery, field, value, false);
117            }
118    
119            public void addNumericRangeTerm(
120                    BooleanQuery booleanQuery, String field, String startValue,
121                    String endValue) {
122    
123                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
124                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
125                            true, true);
126    
127                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
128            }
129    
130            public void addRangeTerm(
131                    BooleanQuery booleanQuery, String field, String startValue,
132                    String endValue) {
133    
134                    boolean includesLower = true;
135    
136                    if (startValue.equals(StringPool.STAR)) {
137                            includesLower = false;
138                    }
139    
140                    boolean includesUpper = true;
141    
142                    if (endValue.equals(StringPool.STAR)) {
143                            includesUpper = false;
144                    }
145    
146                    TermRangeQuery termRangeQuery = new TermRangeQuery(
147                            field, startValue, endValue, includesLower, includesUpper);
148    
149                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
150            }
151    
152            public void addRequiredTerm(
153                    BooleanQuery booleanQuery, String field, String value, boolean like) {
154    
155                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
156            }
157    
158            public void addRequiredTerm(
159                    BooleanQuery booleanQuery, String field, String[] values,
160                    boolean like) {
161    
162                    if (values == null) {
163                            return;
164                    }
165    
166                    BooleanQuery query = new BooleanQuery();
167    
168                    for (String value : values) {
169                            addTerm(query, field, value, like);
170                    }
171    
172                    booleanQuery.add(query, BooleanClause.Occur.MUST);
173            }
174    
175            public void addTerm(
176                    BooleanQuery booleanQuery, String field, String value, boolean like) {
177    
178                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
179            }
180    
181            public void addTerm(
182                    BooleanQuery booleanQuery, String field, String value, boolean like,
183                    BooleanClauseOccur booleanClauseOccur) {
184    
185                    if (Validator.isNull(value)) {
186                            return;
187                    }
188    
189                    if (like) {
190                            value = StringUtil.replace(
191                                    value, StringPool.PERCENT, StringPool.BLANK);
192                    }
193    
194                    try {
195                            QueryParser queryParser = new QueryParser(
196                                    getVersion(), field, getAnalyzer());
197    
198                            Query query = queryParser.parse(value);
199    
200                            BooleanClause.Occur occur = null;
201    
202                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
203                                    occur = BooleanClause.Occur.MUST;
204                            }
205                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
206                                    occur = BooleanClause.Occur.MUST_NOT;
207                            }
208                            else {
209                                    occur = BooleanClause.Occur.SHOULD;
210                            }
211    
212                            _includeIfUnique(booleanQuery, query, occur, like);
213                    }
214                    catch (Exception e) {
215                            _log.error(e, e);
216                    }
217            }
218    
219            public void addTerm(
220                    BooleanQuery booleanQuery, String field, String[] values,
221                    boolean like) {
222    
223                    for (String value : values) {
224                            addTerm(booleanQuery, field, value, like);
225                    }
226            }
227    
228            public int countScoredFieldNames(Query query, String[] filedNames) {
229                    int count = 0;
230    
231                    for (String fieldName : filedNames) {
232                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
233                                    query, false, fieldName);
234    
235                            if ((weightedTerms.length > 0) &&
236                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
237    
238                                    count++;
239                            }
240                    }
241    
242                    return count;
243            }
244    
245            public void delete(long companyId) {
246                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
247    
248                    if (indexAccessor == null) {
249                            return;
250                    }
251    
252                    indexAccessor.delete();
253            }
254    
255            public void deleteDocuments(long companyId, Term term) throws IOException {
256                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
257    
258                    if (indexAccessor == null) {
259                            return;
260                    }
261    
262                    indexAccessor.deleteDocuments(term);
263            }
264    
265            public void dumpIndex(long companyId, OutputStream outputStream)
266                    throws IOException {
267    
268                    long lastGeneration = getLastGeneration(companyId);
269    
270                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
271                            if (_log.isDebugEnabled()) {
272                                    _log.debug(
273                                            "Dump index from cluster is not enabled for " + companyId);
274                            }
275    
276                            return;
277                    }
278    
279                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
280    
281                    if (indexAccessor == null) {
282                            return;
283                    }
284    
285                    indexAccessor.dumpIndex(outputStream);
286            }
287    
288            public Analyzer getAnalyzer() {
289                    return _analyzer;
290            }
291    
292            public long getLastGeneration(long companyId) {
293                    if (!isLoadIndexFromClusterEnabled()) {
294                            return IndexAccessor.DEFAULT_LAST_GENERATION;
295                    }
296    
297                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
298    
299                    if (indexAccessor == null) {
300                            return IndexAccessor.DEFAULT_LAST_GENERATION;
301                    }
302    
303                    return indexAccessor.getLastGeneration();
304            }
305    
306            public InputStream getLoadIndexesInputStreamFromCluster(
307                            long companyId, Address bootupAddress)
308                    throws SystemException {
309    
310                    if (!isLoadIndexFromClusterEnabled()) {
311                            return null;
312                    }
313    
314                    InputStream inputStream = null;
315    
316                    try {
317                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
318                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
319    
320                            URL url = bootupClusterNodeObjectValuePair.getValue();
321    
322                            URLConnection urlConnection = url.openConnection();
323    
324                            urlConnection.setDoOutput(true);
325    
326                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
327                                    urlConnection.getOutputStream());
328    
329                            unsyncPrintWriter.write("transientToken=");
330                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
331                            unsyncPrintWriter.write("&companyId=");
332                            unsyncPrintWriter.write(String.valueOf(companyId));
333    
334                            unsyncPrintWriter.close();
335    
336                            inputStream = urlConnection.getInputStream();
337    
338                            return inputStream;
339                    }
340                    catch (IOException ioe) {
341                            throw new SystemException(ioe);
342                    }
343            }
344    
345            public String[] getQueryTerms(Query query) {
346                    String queryString = StringUtil.replace(
347                            query.toString(), StringPool.STAR, StringPool.BLANK);
348    
349                    Query tempQuery = null;
350    
351                    try {
352                            QueryParser queryParser = new QueryParser(
353                                    getVersion(), StringPool.BLANK, getAnalyzer());
354    
355                            tempQuery = queryParser.parse(queryString);
356                    }
357                    catch (Exception e) {
358                            if (_log.isWarnEnabled()) {
359                                    _log.warn("Unable to parse " + queryString);
360                            }
361    
362                            tempQuery = query;
363                    }
364    
365                    WeightedTerm[] weightedTerms = null;
366    
367                    for (String fieldName : Field.KEYWORDS) {
368                            weightedTerms = QueryTermExtractor.getTerms(
369                                    tempQuery, false, fieldName);
370    
371                            if (weightedTerms.length > 0) {
372                                    break;
373                            }
374                    }
375    
376                    Set<String> queryTerms = new HashSet<String>();
377    
378                    for (WeightedTerm weightedTerm : weightedTerms) {
379                            queryTerms.add(weightedTerm.getTerm());
380                    }
381    
382                    return queryTerms.toArray(new String[queryTerms.size()]);
383            }
384    
385            public IndexSearcher getSearcher(long companyId, boolean readOnly)
386                    throws IOException {
387    
388                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
389    
390                    IndexSearcher indexSearcher = new IndexSearcher(
391                            indexAccessor.getLuceneDir(), readOnly);
392    
393                    indexSearcher.setDefaultFieldSortScoring(true, true);
394                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
395    
396                    return indexSearcher;
397            }
398    
399            public String getSnippet(
400                            Query query, String field, String s, int maxNumFragments,
401                            int fragmentLength, String fragmentSuffix, String preTag,
402                            String postTag)
403                    throws IOException {
404    
405                    SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
406                            preTag, postTag);
407    
408                    QueryScorer queryScorer = new QueryScorer(query, field);
409    
410                    Highlighter highlighter = new Highlighter(
411                            simpleHTMLFormatter, queryScorer);
412    
413                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
414    
415                    TokenStream tokenStream = getAnalyzer().tokenStream(
416                            field, new UnsyncStringReader(s));
417    
418                    try {
419                            String snippet = highlighter.getBestFragments(
420                                    tokenStream, s, maxNumFragments, fragmentSuffix);
421    
422                            if (Validator.isNotNull(snippet) &&
423                                    !StringUtil.endsWith(snippet, fragmentSuffix)) {
424    
425                                    snippet = snippet.concat(fragmentSuffix);
426                            }
427    
428                            return snippet;
429                    }
430                    catch (InvalidTokenOffsetsException itoe) {
431                            throw new IOException(itoe.getMessage());
432                    }
433            }
434    
435            public Version getVersion() {
436                    return _version;
437            }
438    
439            public boolean isLoadIndexFromClusterEnabled() {
440                    if (PropsValues.CLUSTER_LINK_ENABLED &&
441                            PropsValues.LUCENE_REPLICATE_WRITE) {
442    
443                            return true;
444                    }
445    
446                    if (_log.isDebugEnabled()) {
447                            _log.debug("Load index from cluster is not enabled");
448                    }
449    
450                    return false;
451            }
452    
453            public void loadIndex(long companyId, InputStream inputStream)
454                    throws IOException {
455    
456                    if (!isLoadIndexFromClusterEnabled()) {
457                            return;
458                    }
459    
460                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
461    
462                    if (indexAccessor == null) {
463                            return;
464                    }
465    
466                    indexAccessor.loadIndex(inputStream);
467            }
468    
469            public Address selectBootupClusterAddress(
470                            long companyId, long localLastGeneration)
471                    throws SystemException {
472    
473                    if (!isLoadIndexFromClusterEnabled()) {
474                            return null;
475                    }
476    
477                    List<Address> clusterNodeAddresses =
478                            ClusterExecutorUtil.getClusterNodeAddresses();
479    
480                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
481    
482                    if (clusterNodeAddressesCount <= 1) {
483                            if (_log.isDebugEnabled()) {
484                                    _log.debug(
485                                            "Do not load indexes because there is either one portal " +
486                                                    "instance or no portal instances in the cluster");
487                            }
488    
489                            return null;
490                    }
491    
492                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
493                            new MethodHandler(_getLastGenerationMethodKey, companyId),
494                            true);
495    
496                    FutureClusterResponses futureClusterResponses =
497                            ClusterExecutorUtil.execute(clusterRequest);
498    
499                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
500                            futureClusterResponses.getPartialResults();
501    
502                    Address bootupAddress = null;
503    
504                    do {
505                            clusterNodeAddressesCount--;
506    
507                            ClusterNodeResponse clusterNodeResponse = null;
508    
509                            try {
510                                    clusterNodeResponse = clusterNodeResponses.poll(
511                                            _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
512                                            java.util.concurrent.TimeUnit.MILLISECONDS);
513                            }
514                            catch (Exception e) {
515                                    throw new SystemException(e);
516                            }
517    
518                            if (clusterNodeResponse == null) {
519                                    if (_log.isDebugEnabled()) {
520                                            _log.debug(
521                                                    "Unable to get cluster node response in " +
522                                                            _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
523                                                                    java.util.concurrent.TimeUnit.MILLISECONDS);
524                                    }
525    
526                                    continue;
527                            }
528    
529                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
530    
531                            if (clusterNode.getPort() > 0) {
532                                    try {
533                                            long remoteLastGeneration =
534                                                    (Long)clusterNodeResponse.getResult();
535    
536                                            if (remoteLastGeneration > localLastGeneration) {
537                                                    bootupAddress = clusterNodeResponse.getAddress();
538    
539                                                    break;
540                                            }
541                                    }
542                                    catch (Exception e) {
543                                            if (_log.isDebugEnabled()) {
544                                                    _log.debug(
545                                                            "Suppress exception caused by remote method " +
546                                                                    "invocation",
547                                                            e);
548                                            }
549    
550                                            continue;
551                                    }
552                            }
553                            else {
554                                    if (_log.isDebugEnabled()) {
555                                            _log.debug(
556                                                    "Cluster node " + clusterNode + " has invalid port");
557                                    }
558                            }
559                    } while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
560    
561                    return bootupAddress;
562            }
563    
564            public void setAnalyzer(Analyzer analyzer) {
565                    _analyzer = analyzer;
566            }
567    
568            public void setVersion(Version version) {
569                    _version = version;
570            }
571    
572            public void shutdown() {
573                    if (_luceneIndexThreadPoolExecutor != null) {
574                            _luceneIndexThreadPoolExecutor.shutdownNow();
575    
576                            try {
577                                    _luceneIndexThreadPoolExecutor.awaitTermination(
578                                            60, java.util.concurrent.TimeUnit.SECONDS);
579                            }
580                            catch (InterruptedException ie) {
581                                    _log.error("Lucene indexer shutdown interrupted", ie);
582                            }
583                    }
584    
585                    if (isLoadIndexFromClusterEnabled()) {
586                            ClusterExecutorUtil.removeClusterEventListener(
587                                    _loadIndexClusterEventListener);
588                    }
589    
590                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
591                            indexAccessor.close();
592                    }
593            }
594    
595            public void startup(long companyId) {
596                    if (PropsValues.INDEX_ON_STARTUP) {
597                            if (_log.isInfoEnabled()) {
598                                    _log.info("Indexing Lucene on startup");
599                            }
600    
601                            LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
602    
603                            if (PropsValues.INDEX_WITH_THREAD) {
604                                    _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
605                            }
606                            else {
607                                    luceneIndexer.reindex();
608                            }
609                    }
610    
611                    if (PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_ENABLED) {
612                            SchedulerEntry schedulerEntry = new SchedulerEntryImpl();
613    
614                            schedulerEntry.setEventListenerClass(
615                                    CleanUpMessageListener.class.getName());
616                            schedulerEntry.setTimeUnit(TimeUnit.MINUTE);
617                            schedulerEntry.setTriggerType(TriggerType.SIMPLE);
618                            schedulerEntry.setTriggerValue(
619                                    PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_INTERVAL);
620    
621                            try {
622                                    SchedulerEngineUtil.schedule(
623                                            schedulerEntry, StorageType.MEMORY,
624                                            PortalClassLoaderUtil.getClassLoader(), 0);
625                            }
626                            catch (Exception e) {
627                                    _log.error(e, e);
628                            }
629                    }
630            }
631    
632            public void updateDocument(long companyId, Term term, Document document)
633                    throws IOException {
634    
635                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
636    
637                    indexAccessor.updateDocument(term, document);
638            }
639    
640            private LuceneHelperImpl() {
641                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
642                            _luceneIndexThreadPoolExecutor =
643                                    PortalExecutorManagerUtil.getPortalExecutor(
644                                            LuceneHelperImpl.class.getName());
645                    }
646    
647                    if (isLoadIndexFromClusterEnabled()) {
648                            _loadIndexClusterEventListener =
649                                    new LoadIndexClusterEventListener();
650    
651                            ClusterExecutorUtil.addClusterEventListener(
652                                    _loadIndexClusterEventListener);
653                    }
654            }
655    
656            private ObjectValuePair<String, URL>
657                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
658                    throws SystemException {
659    
660                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
661                            new MethodHandler(
662                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
663                            bootupAddress);
664    
665                    FutureClusterResponses futureClusterResponses =
666                            ClusterExecutorUtil.execute(clusterRequest);
667    
668                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
669                            futureClusterResponses.getPartialResults();
670    
671                    try {
672                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
673                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
674                                    java.util.concurrent.TimeUnit.MILLISECONDS);
675    
676                            String transientToken = (String)clusterNodeResponse.getResult();
677    
678                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
679    
680                            InetAddress inetAddress = clusterNode.getInetAddress();
681    
682                            URL url = new URL(
683                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
684                                    "/lucene/dump");
685    
686                            return new ObjectValuePair<String, URL>(transientToken, url);
687                    }
688                    catch (Exception e) {
689                            throw new SystemException(e);
690                    }
691            }
692    
693            private IndexAccessor _getIndexAccessor(long companyId) {
694                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
695    
696                    if (indexAccessor != null) {
697                            return indexAccessor;
698                    }
699    
700                    synchronized (this) {
701                            indexAccessor = _indexAccessors.get(companyId);
702    
703                            if (indexAccessor == null) {
704                                    indexAccessor = new IndexAccessorImpl(companyId);
705    
706                                    if (isLoadIndexFromClusterEnabled()) {
707                                            InputStream inputStream = null;
708    
709                                            try {
710                                                    Address bootupAddress = selectBootupClusterAddress(
711                                                            companyId, IndexAccessor.DEFAULT_LAST_GENERATION);
712    
713                                                    if (bootupAddress != null) {
714                                                            inputStream = getLoadIndexesInputStreamFromCluster(
715                                                                    companyId, bootupAddress);
716    
717                                                            indexAccessor.loadIndex(inputStream);
718                                                    }
719    
720                                                    indexAccessor.enableDumpIndex();
721                                            }
722                                            catch (Exception e) {
723                                                    _log.error(
724                                                            "Unable to load index for company " +
725                                                                    indexAccessor.getCompanyId(),
726                                                            e);
727                                            }
728                                            finally {
729                                                    if (inputStream != null) {
730                                                            try {
731                                                                    inputStream.close();
732                                                            }
733                                                            catch (IOException ioe) {
734                                                                    _log.error(
735                                                                            "Unable to close input stream for " +
736                                                                                    "company " +
737                                                                                            indexAccessor.getCompanyId(),
738                                                                            ioe);
739                                                            }
740                                                    }
741                                            }
742                                    }
743    
744                                    _indexAccessors.put(companyId, indexAccessor);
745                            }
746                    }
747    
748                    return indexAccessor;
749            }
750    
751            private void _includeIfUnique(
752                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
753                    boolean like) {
754    
755                    if (query instanceof TermQuery) {
756                            Set<Term> terms = new HashSet<Term>();
757    
758                            query.extractTerms(terms);
759    
760                            for (Term term : terms) {
761                                    String termValue = term.text();
762    
763                                    if (like) {
764                                            term = term.createTerm(
765                                                    StringPool.STAR.concat(termValue).concat(
766                                                            StringPool.STAR));
767    
768                                            query = new WildcardQuery(term);
769                                    }
770                                    else {
771                                            query = new TermQuery(term);
772                                    }
773    
774                                    boolean included = false;
775    
776                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
777                                            if (query.equals(booleanClause.getQuery())) {
778                                                    included = true;
779                                            }
780                                    }
781    
782                                    if (!included) {
783                                            booleanQuery.add(query, occur);
784                                    }
785                            }
786                    }
787                    else if (query instanceof BooleanQuery) {
788                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
789    
790                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
791                                    _includeIfUnique(
792                                            booleanQuery, booleanClause.getQuery(),
793                                            booleanClause.getOccur(), like);
794                            }
795                    }
796                    else {
797                            boolean included = false;
798    
799                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
800                                    if (query.equals(booleanClause.getQuery())) {
801                                            included = true;
802                                    }
803                            }
804    
805                            if (!included) {
806                                    booleanQuery.add(query, occur);
807                            }
808                    }
809            }
810    
811            private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
812    
813            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
814    
815            private static MethodKey _createTokenMethodKey =
816                    new MethodKey(TransientTokenUtil.class.getName(), "createToken",
817                    long.class);
818            private static MethodKey _getLastGenerationMethodKey =
819                    new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
820                    long.class);
821    
822            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
823    
824            private Analyzer _analyzer;
825            private Map<Long, IndexAccessor> _indexAccessors =
826                    new ConcurrentHashMap<Long, IndexAccessor>();
827            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
828            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
829            private Version _version;
830    
831            private class LoadIndexClusterEventListener
832                    implements ClusterEventListener {
833    
834                    public void processClusterEvent(ClusterEvent clusterEvent) {
835                            ClusterEventType clusterEventType =
836                                    clusterEvent.getClusterEventType();
837    
838                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
839                                    return;
840                            }
841    
842                            List<Address> clusterNodeAddresses =
843                                    ClusterExecutorUtil.getClusterNodeAddresses();
844                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
845    
846                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
847                                    if (_log.isDebugEnabled()) {
848                                            _log.debug(
849                                                    "Number of original cluster members is greater than " +
850                                                            "one");
851                                    }
852    
853                                    return;
854                            }
855    
856                            long[] companyIds = PortalInstances.getCompanyIds();
857    
858                            for (long companyId : companyIds) {
859                                    loadIndexes(companyId);
860                            }
861    
862                            loadIndexes(CompanyConstants.SYSTEM);
863                    }
864    
865                    private void loadIndexes(long companyId) {
866                            long lastGeneration = getLastGeneration(companyId);
867    
868                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
869                                    return;
870                            }
871    
872                            try {
873                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
874                            }
875                            catch (Exception e) {
876                                    _log.error(
877                                            "Unable to load indexes for company " + companyId, e);
878                            }
879                    }
880    
881            }
882    
883    }