001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterNode;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
029    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
030    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
034    import com.liferay.portal.kernel.scheduler.SchedulerEntry;
035    import com.liferay.portal.kernel.scheduler.SchedulerEntryImpl;
036    import com.liferay.portal.kernel.scheduler.StorageType;
037    import com.liferay.portal.kernel.scheduler.TimeUnit;
038    import com.liferay.portal.kernel.scheduler.TriggerType;
039    import com.liferay.portal.kernel.search.Field;
040    import com.liferay.portal.kernel.util.ArrayUtil;
041    import com.liferay.portal.kernel.util.GetterUtil;
042    import com.liferay.portal.kernel.util.MethodHandler;
043    import com.liferay.portal.kernel.util.MethodKey;
044    import com.liferay.portal.kernel.util.ObjectValuePair;
045    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
046    import com.liferay.portal.kernel.util.StringPool;
047    import com.liferay.portal.kernel.util.StringUtil;
048    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
049    import com.liferay.portal.kernel.util.Validator;
050    import com.liferay.portal.model.CompanyConstants;
051    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
052    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
053    import com.liferay.portal.search.lucene.messaging.CleanUpMessageListener;
054    import com.liferay.portal.security.auth.TransientTokenUtil;
055    import com.liferay.portal.util.PortalInstances;
056    import com.liferay.portal.util.PropsValues;
057    
058    import java.io.IOException;
059    import java.io.InputStream;
060    import java.io.OutputStream;
061    
062    import java.net.InetAddress;
063    import java.net.URL;
064    import java.net.URLConnection;
065    
066    import java.util.HashSet;
067    import java.util.List;
068    import java.util.Map;
069    import java.util.Set;
070    import java.util.concurrent.BlockingQueue;
071    import java.util.concurrent.ConcurrentHashMap;
072    
073    import org.apache.lucene.analysis.Analyzer;
074    import org.apache.lucene.analysis.TokenStream;
075    import org.apache.lucene.document.Document;
076    import org.apache.lucene.index.Term;
077    import org.apache.lucene.queryParser.QueryParser;
078    import org.apache.lucene.search.BooleanClause;
079    import org.apache.lucene.search.BooleanQuery;
080    import org.apache.lucene.search.IndexSearcher;
081    import org.apache.lucene.search.NumericRangeQuery;
082    import org.apache.lucene.search.Query;
083    import org.apache.lucene.search.TermQuery;
084    import org.apache.lucene.search.TermRangeQuery;
085    import org.apache.lucene.search.WildcardQuery;
086    import org.apache.lucene.search.highlight.Highlighter;
087    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
088    import org.apache.lucene.search.highlight.QueryScorer;
089    import org.apache.lucene.search.highlight.SimpleFragmenter;
090    import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
091    import org.apache.lucene.search.highlight.WeightedTerm;
092    import org.apache.lucene.util.Version;
093    
094    /**
095     * @author Brian Wing Shun Chan
096     * @author Harry Mark
097     * @author Bruno Farache
098     * @author Shuyang Zhou
099     * @author Tina Tian
100     * @author Hugo Huijser
101     */
102    public class LuceneHelperImpl implements LuceneHelper {
103    
104            public void addDocument(long companyId, Document document)
105                    throws IOException {
106    
107                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
108    
109                    indexAccessor.addDocument(document);
110            }
111    
112            public void addExactTerm(
113                    BooleanQuery booleanQuery, String field, String value) {
114    
115                    addTerm(booleanQuery, field, value, false);
116            }
117    
118            public void addNumericRangeTerm(
119                    BooleanQuery booleanQuery, String field, String startValue,
120                    String endValue) {
121    
122                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
123                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
124                            true, true);
125    
126                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
127            }
128    
129            public void addRangeTerm(
130                    BooleanQuery booleanQuery, String field, String startValue,
131                    String endValue) {
132    
133                    boolean includesLower = true;
134    
135                    if (startValue.equals(StringPool.STAR)) {
136                            includesLower = false;
137                    }
138    
139                    boolean includesUpper = true;
140    
141                    if (endValue.equals(StringPool.STAR)) {
142                            includesUpper = false;
143                    }
144    
145                    TermRangeQuery termRangeQuery = new TermRangeQuery(
146                            field, startValue, endValue, includesLower, includesUpper);
147    
148                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
149            }
150    
151            public void addRequiredTerm(
152                    BooleanQuery booleanQuery, String field, String value, boolean like) {
153    
154                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
155            }
156    
157            public void addRequiredTerm(
158                    BooleanQuery booleanQuery, String field, String[] values,
159                    boolean like) {
160    
161                    if (values == null) {
162                            return;
163                    }
164    
165                    BooleanQuery query = new BooleanQuery();
166    
167                    for (String value : values) {
168                            addTerm(query, field, value, like);
169                    }
170    
171                    booleanQuery.add(query, BooleanClause.Occur.MUST);
172            }
173    
174            public void addTerm(
175                    BooleanQuery booleanQuery, String field, String value, boolean like) {
176    
177                    if (Validator.isNull(value)) {
178                            return;
179                    }
180    
181                    if (like) {
182                            value = StringUtil.replace(
183                                    value, StringPool.PERCENT, StringPool.BLANK);
184                    }
185    
186                    try {
187                            QueryParser queryParser = new QueryParser(
188                                    getVersion(), field, getAnalyzer());
189    
190                            Query query = queryParser.parse(value);
191    
192                            _includeIfUnique(
193                                    booleanQuery, query, BooleanClause.Occur.SHOULD, like);
194                    }
195                    catch (Exception e) {
196                            _log.error(e, e);
197                    }
198            }
199    
200            public void addTerm(
201                    BooleanQuery booleanQuery, String field, String[] values,
202                    boolean like) {
203    
204                    for (String value : values) {
205                            addTerm(booleanQuery, field, value, like);
206                    }
207            }
208    
209            public int countScoredFieldNames(Query query, String[] filedNames) {
210                    int count = 0;
211    
212                    for (String fieldName : filedNames) {
213                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
214                                    query, false, fieldName);
215    
216                            if ((weightedTerms.length > 0) &&
217                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
218    
219                                    count++;
220                            }
221                    }
222    
223                    return count;
224            }
225    
226            public void delete(long companyId) {
227                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
228    
229                    if (indexAccessor == null) {
230                            return;
231                    }
232    
233                    indexAccessor.delete();
234            }
235    
236            public void deleteDocuments(long companyId, Term term) throws IOException {
237                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
238    
239                    if (indexAccessor == null) {
240                            return;
241                    }
242    
243                    indexAccessor.deleteDocuments(term);
244            }
245    
246            public void dumpIndex(long companyId, OutputStream outputStream)
247                    throws IOException {
248    
249                    long lastGeneration = getLastGeneration(companyId);
250    
251                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
252                            if (_log.isDebugEnabled()) {
253                                    _log.debug(
254                                            "Dump index from cluster is not enabled for " + companyId);
255                            }
256    
257                            return;
258                    }
259    
260                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
261    
262                    if (indexAccessor == null) {
263                            return;
264                    }
265    
266                    indexAccessor.dumpIndex(outputStream);
267            }
268    
269            public Analyzer getAnalyzer() {
270                    return _analyzer;
271            }
272    
273            public long getLastGeneration(long companyId) {
274                    if (!isLoadIndexFromClusterEnabled()) {
275                            return IndexAccessor.DEFAULT_LAST_GENERATION;
276                    }
277    
278                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
279    
280                    if (indexAccessor == null) {
281                            return IndexAccessor.DEFAULT_LAST_GENERATION;
282                    }
283    
284                    return indexAccessor.getLastGeneration();
285            }
286    
287            public InputStream getLoadIndexesInputStreamFromCluster(
288                            long companyId, Address bootupAddress)
289                    throws SystemException {
290    
291                    if (!isLoadIndexFromClusterEnabled()) {
292                            return null;
293                    }
294    
295                    InputStream inputStream = null;
296    
297                    try {
298                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
299                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
300    
301                            URL url = bootupClusterNodeObjectValuePair.getValue();
302    
303                            URLConnection urlConnection = url.openConnection();
304    
305                            urlConnection.setDoOutput(true);
306    
307                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
308                                    urlConnection.getOutputStream());
309    
310                            unsyncPrintWriter.write("transientToken=");
311                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
312                            unsyncPrintWriter.write("&companyId=");
313                            unsyncPrintWriter.write(String.valueOf(companyId));
314    
315                            unsyncPrintWriter.close();
316    
317                            inputStream = urlConnection.getInputStream();
318    
319                            return inputStream;
320                    }
321                    catch (IOException ioe) {
322                            throw new SystemException(ioe);
323                    }
324            }
325    
326            public String[] getQueryTerms(Query query) {
327                    String queryString = StringUtil.replace(
328                            query.toString(), StringPool.STAR, StringPool.BLANK);
329    
330                    Query tempQuery = null;
331    
332                    try {
333                            QueryParser queryParser = new QueryParser(
334                                    getVersion(), StringPool.BLANK, getAnalyzer());
335    
336                            tempQuery = queryParser.parse(queryString);
337                    }
338                    catch (Exception e) {
339                            if (_log.isWarnEnabled()) {
340                                    _log.warn("Unable to parse " + queryString);
341                            }
342    
343                            tempQuery = query;
344                    }
345    
346                    WeightedTerm[] weightedTerms = null;
347    
348                    for (String fieldName : Field.KEYWORDS) {
349                            weightedTerms = QueryTermExtractor.getTerms(
350                                    tempQuery, false, fieldName);
351    
352                            if (weightedTerms.length > 0) {
353                                    break;
354                            }
355                    }
356    
357                    Set<String> queryTerms = new HashSet<String>();
358    
359                    for (WeightedTerm weightedTerm : weightedTerms) {
360                            queryTerms.add(weightedTerm.getTerm());
361                    }
362    
363                    return queryTerms.toArray(new String[queryTerms.size()]);
364            }
365    
366            public IndexSearcher getSearcher(long companyId, boolean readOnly)
367                    throws IOException {
368    
369                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
370    
371                    IndexSearcher indexSearcher = new IndexSearcher(
372                            indexAccessor.getLuceneDir(), readOnly);
373    
374                    indexSearcher.setDefaultFieldSortScoring(true, true);
375                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
376    
377                    return indexSearcher;
378            }
379    
380            public String getSnippet(
381                            Query query, String field, String s, int maxNumFragments,
382                            int fragmentLength, String fragmentSuffix, String preTag,
383                            String postTag)
384                    throws IOException {
385    
386                    SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
387                            preTag, postTag);
388    
389                    QueryScorer queryScorer = new QueryScorer(query, field);
390    
391                    Highlighter highlighter = new Highlighter(
392                            simpleHTMLFormatter, queryScorer);
393    
394                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
395    
396                    TokenStream tokenStream = getAnalyzer().tokenStream(
397                            field, new UnsyncStringReader(s));
398    
399                    try {
400                            String snippet = highlighter.getBestFragments(
401                                    tokenStream, s, maxNumFragments, fragmentSuffix);
402    
403                            if (Validator.isNotNull(snippet) &&
404                                    !StringUtil.endsWith(snippet, fragmentSuffix)) {
405    
406                                    snippet = snippet.concat(fragmentSuffix);
407                            }
408    
409                            return snippet;
410                    }
411                    catch (InvalidTokenOffsetsException itoe) {
412                            throw new IOException(itoe.getMessage());
413                    }
414            }
415    
416            public Version getVersion() {
417                    return _version;
418            }
419    
420            public boolean isLoadIndexFromClusterEnabled() {
421                    if (PropsValues.CLUSTER_LINK_ENABLED &&
422                            PropsValues.LUCENE_REPLICATE_WRITE) {
423    
424                            return true;
425                    }
426    
427                    if (_log.isDebugEnabled()) {
428                            _log.debug("Load index from cluster is not enabled");
429                    }
430    
431                    return false;
432            }
433    
434            public void loadIndex(long companyId, InputStream inputStream)
435                    throws IOException {
436    
437                    if (!isLoadIndexFromClusterEnabled()) {
438                            return;
439                    }
440    
441                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
442    
443                    if (indexAccessor == null) {
444                            return;
445                    }
446    
447                    indexAccessor.loadIndex(inputStream);
448            }
449    
450            public Address selectBootupClusterAddress(
451                            long companyId, long localLastGeneration)
452                    throws SystemException {
453    
454                    if (!isLoadIndexFromClusterEnabled()) {
455                            return null;
456                    }
457    
458                    List<Address> clusterNodeAddresses =
459                            ClusterExecutorUtil.getClusterNodeAddresses();
460    
461                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
462    
463                    if (clusterNodeAddressesCount <= 1) {
464                            if (_log.isDebugEnabled()) {
465                                    _log.debug(
466                                            "Do not load indexes because there is either one portal " +
467                                                    "instance or no portal instances in the cluster");
468                            }
469    
470                            return null;
471                    }
472    
473                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
474                            new MethodHandler(_getLastGenerationMethodKey, companyId),
475                            true);
476    
477                    FutureClusterResponses futureClusterResponses =
478                            ClusterExecutorUtil.execute(clusterRequest);
479    
480                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
481                            futureClusterResponses.getPartialResults();
482    
483                    Address bootupAddress = null;
484    
485                    do{
486                            clusterNodeAddressesCount--;
487    
488                            ClusterNodeResponse clusterNodeResponse = null;
489    
490                            try {
491                                    clusterNodeResponse = clusterNodeResponses.poll(
492                                            _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
493                                            java.util.concurrent.TimeUnit.MILLISECONDS);
494                            }
495                            catch (Exception e) {
496                                    throw new SystemException(e);
497                            }
498    
499                            if (clusterNodeResponse == null) {
500                                    if (_log.isDebugEnabled()) {
501                                            _log.debug(
502                                                    "Unable to get cluster node response in " +
503                                                            _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
504                                                                    java.util.concurrent.TimeUnit.MILLISECONDS);
505                                    }
506    
507                                    continue;
508                            }
509    
510                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
511    
512                            if (clusterNode.getPort() > 0) {
513                                    try {
514                                            long remoteLastGeneration =
515                                                    (Long)clusterNodeResponse.getResult();
516    
517                                            if (remoteLastGeneration > localLastGeneration) {
518                                                    bootupAddress = clusterNodeResponse.getAddress();
519    
520                                                    break;
521                                            }
522                                    }
523                                    catch (Exception e) {
524                                            if (_log.isDebugEnabled()) {
525                                                    _log.debug(
526                                                            "Suppress exception caused by remote method " +
527                                                                    "invocation",
528                                                            e);
529                                            }
530    
531                                            continue;
532                                    }
533                            }
534                            else {
535                                    if (_log.isDebugEnabled()) {
536                                            _log.debug(
537                                                    "Cluster node " + clusterNode + " has invalid port");
538                                    }
539                            }
540                    } while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
541    
542                    return bootupAddress;
543            }
544    
545            public void setAnalyzer(Analyzer analyzer) {
546                    _analyzer = analyzer;
547            }
548    
549            public void setVersion(Version version) {
550                    _version = version;
551            }
552    
553            public void shutdown() {
554                    if (_luceneIndexThreadPoolExecutor != null) {
555                            _luceneIndexThreadPoolExecutor.shutdownNow();
556    
557                            try {
558                                    _luceneIndexThreadPoolExecutor.awaitTermination(
559                                            60, java.util.concurrent.TimeUnit.SECONDS);
560                            }
561                            catch (InterruptedException ie) {
562                                    _log.error("Lucene indexer shutdown interrupted", ie);
563                            }
564                    }
565    
566                    if (isLoadIndexFromClusterEnabled()) {
567                            ClusterExecutorUtil.removeClusterEventListener(
568                                    _loadIndexClusterEventListener);
569                    }
570    
571                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
572                            indexAccessor.close();
573                    }
574            }
575    
576            public void startup(long companyId) {
577                    if (PropsValues.INDEX_ON_STARTUP) {
578                            if (_log.isInfoEnabled()) {
579                                    _log.info("Indexing Lucene on startup");
580                            }
581    
582                            LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
583    
584                            if (PropsValues.INDEX_WITH_THREAD) {
585                                    _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
586                            }
587                            else {
588                                    luceneIndexer.reindex();
589                            }
590                    }
591    
592                    if (PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_ENABLED) {
593                            SchedulerEntry schedulerEntry = new SchedulerEntryImpl();
594    
595                            schedulerEntry.setEventListenerClass(
596                                    CleanUpMessageListener.class.getName());
597                            schedulerEntry.setTimeUnit(TimeUnit.MINUTE);
598                            schedulerEntry.setTriggerType(TriggerType.SIMPLE);
599                            schedulerEntry.setTriggerValue(
600                                    PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_INTERVAL);
601    
602                            try {
603                                    SchedulerEngineUtil.schedule(
604                                            schedulerEntry, StorageType.MEMORY,
605                                            PortalClassLoaderUtil.getClassLoader(), 0);
606                            }
607                            catch (Exception e) {
608                                    _log.error(e, e);
609                            }
610                    }
611            }
612    
613            public void updateDocument(long companyId, Term term, Document document)
614                    throws IOException {
615    
616                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
617    
618                    indexAccessor.updateDocument(term, document);
619            }
620    
621            private LuceneHelperImpl() {
622                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
623                            _luceneIndexThreadPoolExecutor =
624                                    PortalExecutorManagerUtil.getPortalExecutor(
625                                            LuceneHelperImpl.class.getName());
626                    }
627    
628                    if (isLoadIndexFromClusterEnabled()) {
629                            _loadIndexClusterEventListener =
630                                    new LoadIndexClusterEventListener();
631    
632                            ClusterExecutorUtil.addClusterEventListener(
633                                    _loadIndexClusterEventListener);
634                    }
635            }
636    
637            private ObjectValuePair<String, URL>
638                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
639                    throws SystemException {
640    
641                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
642                            new MethodHandler(
643                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
644                            bootupAddress);
645    
646                    FutureClusterResponses futureClusterResponses =
647                            ClusterExecutorUtil.execute(clusterRequest);
648    
649                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
650                            futureClusterResponses.getPartialResults();
651    
652                    try {
653                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
654                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
655                                    java.util.concurrent.TimeUnit.MILLISECONDS);
656    
657                            String transientToken = (String)clusterNodeResponse.getResult();
658    
659                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
660    
661                            InetAddress inetAddress = clusterNode.getInetAddress();
662    
663                            URL url = new URL(
664                                    "http", inetAddress.getHostAddress(),
665                                    clusterNode.getPort(), "/lucene/dump");
666    
667                            return new ObjectValuePair<String, URL>(transientToken, url);
668                    }
669                    catch (Exception e) {
670                            throw new SystemException(e);
671                    }
672            }
673    
674            private IndexAccessor _getIndexAccessor(long companyId) {
675                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
676    
677                    if (indexAccessor != null) {
678                            return indexAccessor;
679                    }
680    
681                    synchronized (this) {
682                            indexAccessor = _indexAccessors.get(companyId);
683    
684                            if (indexAccessor == null) {
685                                    indexAccessor = new IndexAccessorImpl(companyId);
686    
687                                    if (isLoadIndexFromClusterEnabled()) {
688                                            InputStream inputStream = null;
689    
690                                            try {
691                                                    Address bootupAddress = selectBootupClusterAddress(
692                                                            companyId, IndexAccessor.DEFAULT_LAST_GENERATION);
693    
694                                                    if (bootupAddress != null) {
695                                                            inputStream = getLoadIndexesInputStreamFromCluster(
696                                                                    companyId, bootupAddress);
697    
698                                                            indexAccessor.loadIndex(inputStream);
699                                                    }
700    
701                                                    indexAccessor.enableDumpIndex();
702                                            }
703                                            catch (Exception e) {
704                                                    _log.error(
705                                                            "Unable to load index for company " +
706                                                                    indexAccessor.getCompanyId(),
707                                                            e);
708                                            }
709                                            finally {
710                                                    if (inputStream != null) {
711                                                            try {
712                                                                    inputStream.close();
713                                                            }
714                                                            catch (IOException ioe) {
715                                                                    _log.error(
716                                                                            "Unable to close input stream for " +
717                                                                                    "company " +
718                                                                                            indexAccessor.getCompanyId(),
719                                                                            ioe);
720                                                            }
721                                                    }
722                                            }
723                                    }
724    
725                                    _indexAccessors.put(companyId, indexAccessor);
726                            }
727                    }
728    
729                    return indexAccessor;
730            }
731    
732            private void _includeIfUnique(
733                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
734                    boolean like) {
735    
736                    if (query instanceof TermQuery) {
737                            Set<Term> terms = new HashSet<Term>();
738    
739                            query.extractTerms(terms);
740    
741                            for (Term term : terms) {
742                                    String termValue = term.text();
743    
744                                    if (like) {
745                                            term = term.createTerm(
746                                                    StringPool.STAR.concat(termValue).concat(
747                                                            StringPool.STAR));
748    
749                                            query = new WildcardQuery(term);
750                                    }
751                                    else {
752                                            query = new TermQuery(term);
753                                    }
754    
755                                    boolean included = false;
756    
757                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
758                                            if (query.equals(booleanClause.getQuery())) {
759                                                    included = true;
760                                            }
761                                    }
762    
763                                    if (!included) {
764                                            booleanQuery.add(query, occur);
765                                    }
766                            }
767                    }
768                    else if (query instanceof BooleanQuery) {
769                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
770    
771                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
772                                    _includeIfUnique(
773                                            booleanQuery, booleanClause.getQuery(),
774                                            booleanClause.getOccur(), like);
775                            }
776                    }
777                    else {
778                            boolean included = false;
779    
780                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
781                                    if (query.equals(booleanClause.getQuery())) {
782                                            included = true;
783                                    }
784                            }
785    
786                            if (!included) {
787                                    booleanQuery.add(query, occur);
788                            }
789                    }
790            }
791    
792            private class LoadIndexClusterEventListener
793                    implements ClusterEventListener {
794    
795                    public void processClusterEvent(ClusterEvent clusterEvent) {
796                            ClusterEventType clusterEventType =
797                                    clusterEvent.getClusterEventType();
798    
799                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
800                                    return;
801                            }
802    
803                            List<Address> clusterNodeAddresses =
804                                    ClusterExecutorUtil.getClusterNodeAddresses();
805                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
806    
807                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
808                                    if (_log.isDebugEnabled()) {
809                                            _log.debug(
810                                                    "Number of original cluster members is greater than " +
811                                                            "one");
812                                    }
813    
814                                    return;
815                            }
816    
817                            long[] companyIds = PortalInstances.getCompanyIds();
818    
819                            for (long companyId : companyIds) {
820                                    loadIndexes(companyId);
821                            }
822    
823                            loadIndexes(CompanyConstants.SYSTEM);
824                    }
825    
826                    private void loadIndexes(long companyId) {
827                            long lastGeneration = getLastGeneration(companyId);
828    
829                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
830                                    return;
831                            }
832    
833                            try {
834                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
835                            }
836                            catch (Exception e) {
837                                    _log.error(
838                                            "Unable to load indexes for company " + companyId, e);
839                            }
840                    }
841    
842            }
843    
844            private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
845    
846            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
847    
848            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
849    
850            private static MethodKey _createTokenMethodKey =
851                    new MethodKey(TransientTokenUtil.class.getName(), "createToken",
852                    long.class);
853            private static MethodKey _getLastGenerationMethodKey =
854                    new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
855                    long.class);
856    
857            private Analyzer _analyzer;
858            private Map<Long, IndexAccessor> _indexAccessors =
859                    new ConcurrentHashMap<Long, IndexAccessor>();
860            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
861            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
862            private Version _version;
863    
864    }