001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021    import com.liferay.portal.kernel.process.ProcessCallable;
022    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023    import com.liferay.portal.kernel.resiliency.spi.SPI;
024    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025    import com.liferay.portal.kernel.search.SearchEngineUtil;
026    import com.liferay.portal.kernel.util.FileUtil;
027    import com.liferay.portal.kernel.util.InstanceFactory;
028    import com.liferay.portal.kernel.util.StringPool;
029    import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
030    import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
031    import com.liferay.portal.util.ClassLoaderUtil;
032    import com.liferay.portal.util.PropsValues;
033    
034    import java.io.File;
035    import java.io.IOException;
036    import java.io.InputStream;
037    import java.io.OutputStream;
038    import java.io.Serializable;
039    
040    import java.util.Collection;
041    import java.util.concurrent.Executors;
042    import java.util.concurrent.ScheduledExecutorService;
043    import java.util.concurrent.TimeUnit;
044    import java.util.concurrent.locks.Lock;
045    import java.util.concurrent.locks.ReentrantLock;
046    
047    import org.apache.lucene.analysis.Analyzer;
048    import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
049    import org.apache.lucene.document.Document;
050    import org.apache.lucene.index.IndexReader;
051    import org.apache.lucene.index.IndexWriter;
052    import org.apache.lucene.index.IndexWriterConfig;
053    import org.apache.lucene.index.LogMergePolicy;
054    import org.apache.lucene.index.MergePolicy;
055    import org.apache.lucene.index.MergeScheduler;
056    import org.apache.lucene.index.NoMergePolicy;
057    import org.apache.lucene.index.NoMergeScheduler;
058    import org.apache.lucene.index.Term;
059    import org.apache.lucene.search.IndexSearcher;
060    import org.apache.lucene.search.MatchAllDocsQuery;
061    import org.apache.lucene.search.ScoreDoc;
062    import org.apache.lucene.search.TopDocs;
063    import org.apache.lucene.store.Directory;
064    import org.apache.lucene.store.FSDirectory;
065    import org.apache.lucene.store.MMapDirectory;
066    import org.apache.lucene.store.RAMDirectory;
067    
068    /**
069     * @author Harry Mark
070     * @author Brian Wing Shun Chan
071     * @author Bruno Farache
072     * @author Shuyang Zhou
073     * @author Mate Thurzo
074     */
075    public class IndexAccessorImpl implements IndexAccessor {
076    
077            public IndexAccessorImpl(long companyId) {
078                    _companyId = companyId;
079    
080                    _path = PropsValues.LUCENE_DIR + _companyId + StringPool.SLASH;
081    
082                    try {
083                            if (!SPIUtil.isSPI()) {
084                                    _checkLuceneDir();
085                                    _initIndexWriter();
086                                    _initCommitScheduler();
087    
088                                    _indexSearcherManager = new IndexSearcherManager(_indexWriter);
089                            }
090                            else {
091                                    _indexSearcherManager = new IndexSearcherManager(
092                                            getLuceneDir());
093                            }
094                    }
095                    catch (IOException ioe) {
096                            _log.error(
097                                    "Unable to initialize index searcher manager for company " +
098                                            _companyId,
099                                    ioe);
100                    }
101            }
102    
103            @Override
104            public IndexSearcher acquireIndexSearcher() throws IOException {
105                    return _indexSearcherManager.acquire();
106            }
107    
108            @Override
109            public void addDocument(Document document) throws IOException {
110                    if (SearchEngineUtil.isIndexReadOnly()) {
111                            return;
112                    }
113    
114                    _write(null, document);
115            }
116    
117            @Override
118            public void addDocuments(Collection<Document> documents)
119                    throws IOException {
120    
121                    try {
122                            for (Document document : documents) {
123                                    _indexWriter.addDocument(document);
124                            }
125    
126                            _batchCount++;
127                    }
128                    finally {
129                            _commit();
130                    }
131            }
132    
133            @Override
134            public void close() {
135                    if (SPIUtil.isSPI()) {
136                            return;
137                    }
138    
139                    try {
140                            _indexSearcherManager.close();
141    
142                            _indexWriter.close();
143    
144                            _directory.close();
145                    }
146                    catch (Exception e) {
147                            _log.error("Closing Lucene writer failed for " + _companyId, e);
148                    }
149    
150                    if (_scheduledExecutorService != null) {
151                            _scheduledExecutorService.shutdownNow();
152                    }
153            }
154    
155            @Override
156            public void delete() {
157                    if (SearchEngineUtil.isIndexReadOnly()) {
158                            return;
159                    }
160    
161                    _deleteDirectory();
162            }
163    
164            @Override
165            public void deleteDocuments(Term term) throws IOException {
166                    if (SearchEngineUtil.isIndexReadOnly()) {
167                            return;
168                    }
169    
170                    try {
171                            _indexWriter.deleteDocuments(term);
172    
173                            _batchCount++;
174                    }
175                    finally {
176                            _commit();
177                    }
178            }
179    
180            @Override
181            public void dumpIndex(OutputStream outputStream) throws IOException {
182                    try {
183                            _dumpIndexDeletionPolicy.dump(
184                                    outputStream, _indexWriter, _commitLock);
185                    }
186                    finally {
187                            _indexSearcherManager.invalidate();
188                    }
189            }
190    
191            @Override
192            public long getCompanyId() {
193                    return _companyId;
194            }
195    
196            @Override
197            public long getLastGeneration() {
198                    return _dumpIndexDeletionPolicy.getLastGeneration();
199            }
200    
201            @Override
202            public Directory getLuceneDir() {
203                    if (_directory != null) {
204                            return _directory;
205                    }
206    
207                    if (_log.isDebugEnabled()) {
208                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
209                    }
210    
211                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
212                            _directory = _getLuceneDirFile();
213                    }
214                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
215                                            _LUCENE_STORE_TYPE_JDBC)) {
216    
217                            throw new IllegalArgumentException(
218                                    "Store type JDBC is no longer supported in favor of SOLR");
219                    }
220                    else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
221                            _directory = new RAMDirectory();
222                    }
223                    else {
224                            throw new RuntimeException(
225                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
226                    }
227    
228                    return _directory;
229            }
230    
231            @Override
232            public void invalidate() {
233                    _indexSearcherManager.invalidate();
234            }
235    
236            @Override
237            public void loadIndex(InputStream inputStream) throws IOException {
238                    File tempFile = FileUtil.createTempFile();
239    
240                    try (Directory tempDirectory = FSDirectory.open(tempFile)) {
241                            IndexCommitSerializationUtil.deserializeIndex(
242                                    inputStream, tempDirectory);
243    
244                            _deleteDirectory();
245    
246                            try (IndexReader indexReader = IndexReader.open(
247                                            tempDirectory, false)) {
248    
249                                    if (indexReader.numDocs() > 0) {
250                                            try (IndexSearcher indexSearcher = new IndexSearcher(
251                                                            indexReader)) {
252    
253                                                    TopDocs topDocs = indexSearcher.search(
254                                                            new MatchAllDocsQuery(), indexReader.numDocs());
255    
256                                                    ScoreDoc[] scoreDocs = topDocs.scoreDocs;
257    
258                                                    for (ScoreDoc scoreDoc : scoreDocs) {
259                                                            Document document = indexSearcher.doc(scoreDoc.doc);
260    
261                                                            addDocument(document);
262                                                    }
263                                            }
264                                            catch (IllegalArgumentException iae) {
265                                                    if (_log.isDebugEnabled()) {
266                                                            _log.debug(iae.getMessage());
267                                                    }
268                                            }
269                                    }
270    
271                                    indexReader.flush();
272                            }
273                    }
274    
275                    FileUtil.deltree(tempFile);
276            }
277    
278            @Override
279            public void releaseIndexSearcher(IndexSearcher indexSearcher)
280                    throws IOException {
281    
282                    _indexSearcherManager.release(indexSearcher);
283            }
284    
285            @Override
286            public void updateDocument(Term term, Document document)
287                    throws IOException {
288    
289                    if (SearchEngineUtil.isIndexReadOnly()) {
290                            return;
291                    }
292    
293                    if (_log.isDebugEnabled()) {
294                            _log.debug("Indexing " + document);
295                    }
296    
297                    _write(term, document);
298            }
299    
300            private static void _invalidate(long companyId) {
301                    for (SPI spi : MPIHelperUtil.getSPIs()) {
302                            try {
303                                    RegistrationReference registrationReference =
304                                            spi.getRegistrationReference();
305    
306                                    IntrabandRPCUtil.execute(
307                                            registrationReference,
308                                            new InvalidateProcessCallable(companyId));
309                            }
310                            catch (Exception e) {
311                                    _log.error(
312                                            "Unable to invalidate SPI " + spi + " for company " +
313                                                    companyId,
314                                            e);
315                            }
316                    }
317            }
318    
319            private void _checkLuceneDir() {
320                    if (SearchEngineUtil.isIndexReadOnly()) {
321                            return;
322                    }
323    
324                    try {
325                            Directory directory = getLuceneDir();
326    
327                            if (IndexWriter.isLocked(directory)) {
328                                    IndexWriter.unlock(directory);
329                            }
330                    }
331                    catch (Exception e) {
332                            _log.error("Check Lucene directory failed for " + _companyId, e);
333                    }
334            }
335    
336            private void _commit() throws IOException {
337                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
338                            (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
339    
340                            _doCommit();
341                    }
342            }
343    
344            private void _deleteAll() {
345                    try {
346                            _indexWriter.deleteAll();
347    
348                            _doCommit();
349                    }
350                    catch (Exception e) {
351                            if (_log.isWarnEnabled()) {
352                                    _log.warn("Unable to delete index in directory " + _path);
353                            }
354                    }
355            }
356    
357            private void _deleteDirectory() {
358                    if (_log.isDebugEnabled()) {
359                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
360                    }
361    
362                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
363                            PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
364    
365                            _deleteAll();
366                    }
367                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
368                                            _LUCENE_STORE_TYPE_JDBC)) {
369    
370                            throw new IllegalArgumentException(
371                                    "Store type JDBC is no longer supported in favor of SOLR");
372                    }
373                    else {
374                            throw new RuntimeException(
375                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
376                    }
377            }
378    
379            private void _doCommit() throws IOException {
380                    if (_indexWriter != null) {
381                            _commitLock.lock();
382    
383                            try {
384                                    _indexWriter.commit();
385                            }
386                            finally {
387                                    _commitLock.unlock();
388    
389                                    _indexSearcherManager.invalidate();
390    
391                                    _invalidate(_companyId);
392                            }
393                    }
394    
395                    _batchCount = 0;
396            }
397    
398            private Directory _getLuceneDirFile() {
399                    Directory directory = null;
400    
401                    try {
402                            if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
403                                    directory = new MMapDirectory(new File(_path));
404                            }
405                            else {
406                                    directory = FSDirectory.open(new File(_path));
407                            }
408                    }
409                    catch (IOException ioe) {
410                            if (directory != null) {
411                                    try {
412                                            directory.close();
413                                    }
414                                    catch (Exception e) {
415                                    }
416                            }
417                    }
418    
419                    return directory;
420            }
421    
422            private MergePolicy _getMergePolicy() throws Exception {
423                    if (PropsValues.LUCENE_MERGE_POLICY.equals(
424                                    NoMergePolicy.class.getName())) {
425    
426                            return NoMergePolicy.NO_COMPOUND_FILES;
427                    }
428    
429                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
430    
431                    MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
432                            classLoader, PropsValues.LUCENE_MERGE_POLICY);
433    
434                    if (mergePolicy instanceof LogMergePolicy) {
435                            LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
436    
437                            logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
438                    }
439    
440                    return mergePolicy;
441            }
442    
443            private MergeScheduler _getMergeScheduler() throws Exception {
444                    if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
445                                    NoMergeScheduler.class.getName())) {
446    
447                            return NoMergeScheduler.INSTANCE;
448                    }
449    
450                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
451    
452                    return (MergeScheduler)InstanceFactory.newInstance(
453                            classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
454            }
455    
456            private void _initCommitScheduler() {
457                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
458                            (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
459    
460                            return;
461                    }
462    
463                    _scheduledExecutorService =
464                            Executors.newSingleThreadScheduledExecutor();
465    
466                    Runnable runnable = new Runnable() {
467    
468                            @Override
469                            public void run() {
470                                    try {
471                                            if (_batchCount > 0) {
472                                                    _doCommit();
473                                            }
474                                    }
475                                    catch (IOException ioe) {
476                                            _log.error("Could not run scheduled commit", ioe);
477                                    }
478                            }
479    
480                    };
481    
482                    _scheduledExecutorService.scheduleWithFixedDelay(
483                            runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
484                            TimeUnit.MILLISECONDS);
485            }
486    
487            private void _initIndexWriter() {
488                    try {
489                            Analyzer analyzer = new LimitTokenCountAnalyzer(
490                                    LuceneHelperUtil.getAnalyzer(),
491                                    PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
492    
493                            IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
494                                    LuceneHelperUtil.getVersion(), analyzer);
495    
496                            indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
497                            indexWriterConfig.setMergePolicy(_getMergePolicy());
498                            indexWriterConfig.setMergeScheduler(_getMergeScheduler());
499                            indexWriterConfig.setRAMBufferSizeMB(
500                                    PropsValues.LUCENE_BUFFER_SIZE);
501    
502                            _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
503    
504                            if (!IndexReader.indexExists(getLuceneDir())) {
505    
506                                    // Workaround for LUCENE-2386
507    
508                                    if (_log.isDebugEnabled()) {
509                                            _log.debug("Creating missing index");
510                                    }
511    
512                                    _indexWriter.commit();
513                            }
514                    }
515                    catch (Exception e) {
516                            _log.error(
517                                    "Initializing Lucene writer failed for " + _companyId, e);
518                    }
519            }
520    
521            private void _write(Term term, Document document) throws IOException {
522                    try {
523                            if (term != null) {
524                                    _indexWriter.updateDocument(term, document);
525                            }
526                            else {
527                                    _indexWriter.addDocument(document);
528                            }
529    
530                            _batchCount++;
531                    }
532                    finally {
533                            _commit();
534                    }
535            }
536    
537            private static final String _LUCENE_STORE_TYPE_FILE = "file";
538    
539            private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
540    
541            private static final String _LUCENE_STORE_TYPE_RAM = "ram";
542    
543            private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
544    
545            private volatile int _batchCount;
546            private Lock _commitLock = new ReentrantLock();
547            private long _companyId;
548            private Directory _directory;
549            private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
550                    new DumpIndexDeletionPolicy();
551            private IndexSearcherManager _indexSearcherManager;
552            private IndexWriter _indexWriter;
553            private String _path;
554            private ScheduledExecutorService _scheduledExecutorService;
555    
556            private static class InvalidateProcessCallable
557                    implements ProcessCallable<Serializable> {
558    
559                    public InvalidateProcessCallable(long companyId) {
560                            _companyId = companyId;
561                    }
562    
563                    @Override
564                    public Serializable call() {
565                            IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
566                                    _companyId);
567    
568                            indexAccessor.invalidate();
569    
570                            _invalidate(_companyId);
571    
572                            return null;
573                    }
574    
575                    private static final long serialVersionUID = 1L;
576    
577                    private final long _companyId;
578    
579            }
580    
581    }