001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021    import com.liferay.portal.kernel.process.ProcessCallable;
022    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023    import com.liferay.portal.kernel.resiliency.spi.SPI;
024    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025    import com.liferay.portal.kernel.search.SearchEngineUtil;
026    import com.liferay.portal.kernel.util.FileUtil;
027    import com.liferay.portal.kernel.util.InstanceFactory;
028    import com.liferay.portal.kernel.util.StringPool;
029    import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
030    import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
031    import com.liferay.portal.util.ClassLoaderUtil;
032    import com.liferay.portal.util.PropsValues;
033    
034    import java.io.File;
035    import java.io.IOException;
036    import java.io.InputStream;
037    import java.io.OutputStream;
038    import java.io.Serializable;
039    
040    import java.util.Collection;
041    import java.util.Map;
042    import java.util.concurrent.ConcurrentHashMap;
043    import java.util.concurrent.Executors;
044    import java.util.concurrent.ScheduledExecutorService;
045    import java.util.concurrent.TimeUnit;
046    import java.util.concurrent.locks.Lock;
047    import java.util.concurrent.locks.ReentrantLock;
048    
049    import org.apache.lucene.analysis.Analyzer;
050    import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
051    import org.apache.lucene.document.Document;
052    import org.apache.lucene.index.IndexReader;
053    import org.apache.lucene.index.IndexWriter;
054    import org.apache.lucene.index.IndexWriterConfig;
055    import org.apache.lucene.index.LogMergePolicy;
056    import org.apache.lucene.index.MergePolicy;
057    import org.apache.lucene.index.MergeScheduler;
058    import org.apache.lucene.index.NoMergePolicy;
059    import org.apache.lucene.index.NoMergeScheduler;
060    import org.apache.lucene.index.Term;
061    import org.apache.lucene.search.IndexSearcher;
062    import org.apache.lucene.search.MatchAllDocsQuery;
063    import org.apache.lucene.search.ScoreDoc;
064    import org.apache.lucene.search.TopDocs;
065    import org.apache.lucene.store.Directory;
066    import org.apache.lucene.store.FSDirectory;
067    import org.apache.lucene.store.MMapDirectory;
068    import org.apache.lucene.store.RAMDirectory;
069    
070    /**
071     * @author Harry Mark
072     * @author Brian Wing Shun Chan
073     * @author Bruno Farache
074     * @author Shuyang Zhou
075     * @author Mate Thurzo
076     */
077    public class IndexAccessorImpl implements IndexAccessor {
078    
079            public IndexAccessorImpl(long companyId) {
080                    _companyId = companyId;
081    
082                    _path = PropsValues.LUCENE_DIR.concat(
083                            String.valueOf(_companyId)).concat(StringPool.SLASH);
084    
085                    try {
086                            if (!SPIUtil.isSPI()) {
087                                    _checkLuceneDir();
088                                    _initIndexWriter();
089                                    _initCommitScheduler();
090    
091                                    _indexSearcherManager = new IndexSearcherManager(_indexWriter);
092                            }
093                            else {
094                                    _indexSearcherManager = new IndexSearcherManager(
095                                            getLuceneDir());
096                            }
097                    }
098                    catch (IOException ioe) {
099                            _log.error(
100                                    "Unable to initialize index searcher manager for company " +
101                                            _companyId,
102                                    ioe);
103                    }
104            }
105    
106            @Override
107            public IndexSearcher acquireIndexSearcher() throws IOException {
108                    return _indexSearcherManager.acquire();
109            }
110    
111            @Override
112            public void addDocument(Document document) throws IOException {
113                    if (SearchEngineUtil.isIndexReadOnly()) {
114                            return;
115                    }
116    
117                    _write(null, document);
118            }
119    
120            @Override
121            public void addDocuments(Collection<Document> documents)
122                    throws IOException {
123    
124                    try {
125                            for (Document document : documents) {
126                                    _indexWriter.addDocument(document);
127                            }
128    
129                            _batchCount++;
130                    }
131                    finally {
132                            _commit();
133                    }
134            }
135    
136            @Override
137            public void close() {
138                    if (SPIUtil.isSPI()) {
139                            return;
140                    }
141    
142                    try {
143                            _indexSearcherManager.close();
144    
145                            _indexWriter.close();
146    
147                            _directory.close();
148                    }
149                    catch (Exception e) {
150                            _log.error("Closing Lucene writer failed for " + _companyId, e);
151                    }
152    
153                    if (_scheduledExecutorService != null) {
154                            _scheduledExecutorService.shutdownNow();
155                    }
156            }
157    
158            @Override
159            public void delete() {
160                    if (SearchEngineUtil.isIndexReadOnly()) {
161                            return;
162                    }
163    
164                    _deleteDirectory();
165            }
166    
167            @Override
168            public void deleteDocuments(Term term) throws IOException {
169                    if (SearchEngineUtil.isIndexReadOnly()) {
170                            return;
171                    }
172    
173                    try {
174                            _indexWriter.deleteDocuments(term);
175    
176                            _batchCount++;
177                    }
178                    finally {
179                            _commit();
180                    }
181            }
182    
183            @Override
184            public void dumpIndex(OutputStream outputStream) throws IOException {
185                    try {
186                            _dumpIndexDeletionPolicy.dump(
187                                    outputStream, _indexWriter, _commitLock);
188                    }
189                    finally {
190                            _indexSearcherManager.invalidate();
191                    }
192            }
193    
194            @Override
195            public long getCompanyId() {
196                    return _companyId;
197            }
198    
199            @Override
200            public long getLastGeneration() {
201                    return _dumpIndexDeletionPolicy.getLastGeneration();
202            }
203    
204            @Override
205            public Directory getLuceneDir() {
206                    if (_directory != null) {
207                            return _directory;
208                    }
209    
210                    if (_log.isDebugEnabled()) {
211                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
212                    }
213    
214                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
215                            _directory = _getLuceneDirFile();
216                    }
217                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
218                                            _LUCENE_STORE_TYPE_JDBC)) {
219    
220                            throw new IllegalArgumentException(
221                                    "Store type JDBC is no longer supported in favor of SOLR");
222                    }
223                    else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
224                            _directory = new RAMDirectory();
225                    }
226                    else {
227                            throw new RuntimeException(
228                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
229                    }
230    
231                    return _directory;
232            }
233    
234            @Override
235            public void invalidate() {
236                    _indexSearcherManager.invalidate();
237            }
238    
239            @Override
240            public void loadIndex(InputStream inputStream) throws IOException {
241                    File tempFile = FileUtil.createTempFile();
242    
243                    Directory tempDirectory = FSDirectory.open(tempFile);
244    
245                    IndexCommitSerializationUtil.deserializeIndex(
246                            inputStream, tempDirectory);
247    
248                    _deleteDirectory();
249    
250                    IndexReader indexReader = IndexReader.open(tempDirectory, false);
251    
252                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
253    
254                    try {
255                            TopDocs topDocs = indexSearcher.search(
256                                    new MatchAllDocsQuery(), indexReader.numDocs());
257    
258                            ScoreDoc[] scoreDocs = topDocs.scoreDocs;
259    
260                            for (ScoreDoc scoreDoc : scoreDocs) {
261                                    Document document = indexSearcher.doc(scoreDoc.doc);
262    
263                                    addDocument(document);
264                            }
265                    }
266                    catch (IllegalArgumentException iae) {
267                            if (_log.isDebugEnabled()) {
268                                    _log.debug(iae.getMessage());
269                            }
270                    }
271    
272                    indexSearcher.close();
273    
274                    indexReader.flush();
275                    indexReader.close();
276    
277                    tempDirectory.close();
278    
279                    FileUtil.deltree(tempFile);
280            }
281    
282            @Override
283            public void releaseIndexSearcher(IndexSearcher indexSearcher)
284                    throws IOException {
285    
286                    _indexSearcherManager.release(indexSearcher);
287            }
288    
289            @Override
290            public void updateDocument(Term term, Document document)
291                    throws IOException {
292    
293                    if (SearchEngineUtil.isIndexReadOnly()) {
294                            return;
295                    }
296    
297                    if (_log.isDebugEnabled()) {
298                            _log.debug("Indexing " + document);
299                    }
300    
301                    _write(term, document);
302            }
303    
304            private static void _invalidate(long companyId) {
305                    for (SPI spi : MPIHelperUtil.getSPIs()) {
306                            try {
307                                    RegistrationReference registrationReference =
308                                            spi.getRegistrationReference();
309    
310                                    IntrabandRPCUtil.execute(
311                                            registrationReference,
312                                            new InvalidateProcessCallable(companyId));
313                            }
314                            catch (Exception e) {
315                                    _log.error(
316                                            "Unable to invalidate SPI " + spi + " for company " +
317                                                    companyId,
318                                            e);
319                            }
320                    }
321            }
322    
323            private void _checkLuceneDir() {
324                    if (SearchEngineUtil.isIndexReadOnly()) {
325                            return;
326                    }
327    
328                    try {
329                            Directory directory = getLuceneDir();
330    
331                            if (IndexWriter.isLocked(directory)) {
332                                    IndexWriter.unlock(directory);
333                            }
334                    }
335                    catch (Exception e) {
336                            _log.error("Check Lucene directory failed for " + _companyId, e);
337                    }
338            }
339    
340            private void _commit() throws IOException {
341                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
342                            (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
343    
344                            _doCommit();
345                    }
346            }
347    
348            private void _deleteAll() {
349                    try {
350                            _indexWriter.deleteAll();
351    
352                            _doCommit();
353                    }
354                    catch (Exception e) {
355                            if (_log.isWarnEnabled()) {
356                                    _log.warn("Unable to delete index in directory " + _path);
357                            }
358                    }
359            }
360    
361            private void _deleteDirectory() {
362                    if (_log.isDebugEnabled()) {
363                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
364                    }
365    
366                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
367                            PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
368    
369                            _deleteAll();
370                    }
371                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
372                                            _LUCENE_STORE_TYPE_JDBC)) {
373    
374                            throw new IllegalArgumentException(
375                                    "Store type JDBC is no longer supported in favor of SOLR");
376                    }
377                    else {
378                            throw new RuntimeException(
379                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
380                    }
381            }
382    
383            private void _doCommit() throws IOException {
384                    if (_indexWriter != null) {
385                            _commitLock.lock();
386    
387                            try {
388                                    _indexWriter.commit();
389                            }
390                            finally {
391                                    _commitLock.unlock();
392    
393                                    _indexSearcherManager.invalidate();
394    
395                                    _invalidate(_companyId);
396                            }
397                    }
398    
399                    _batchCount = 0;
400            }
401    
402            private Directory _getLuceneDirFile() {
403                    Directory directory = null;
404    
405                    try {
406                            if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
407                                    directory = new MMapDirectory(new File(_path));
408                            }
409                            else {
410                                    directory = FSDirectory.open(new File(_path));
411                            }
412                    }
413                    catch (IOException ioe) {
414                            if (directory != null) {
415                                    try {
416                                            directory.close();
417                                    }
418                                    catch (Exception e) {
419                                    }
420                            }
421                    }
422    
423                    return directory;
424            }
425    
426            private MergePolicy _getMergePolicy() throws Exception {
427                    if (PropsValues.LUCENE_MERGE_POLICY.equals(
428                                    NoMergePolicy.class.getName())) {
429    
430                            return NoMergePolicy.NO_COMPOUND_FILES;
431                    }
432    
433                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
434    
435                    MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
436                            classLoader, PropsValues.LUCENE_MERGE_POLICY);
437    
438                    if (mergePolicy instanceof LogMergePolicy) {
439                            LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
440    
441                            logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
442                    }
443    
444                    return mergePolicy;
445            }
446    
447            private MergeScheduler _getMergeScheduler() throws Exception {
448                    if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
449                                    NoMergeScheduler.class.getName())) {
450    
451                            return NoMergeScheduler.INSTANCE;
452                    }
453    
454                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
455    
456                    return (MergeScheduler)InstanceFactory.newInstance(
457                            classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
458            }
459    
460            private void _initCommitScheduler() {
461                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
462                            (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
463    
464                            return;
465                    }
466    
467                    _scheduledExecutorService =
468                            Executors.newSingleThreadScheduledExecutor();
469    
470                    Runnable runnable = new Runnable() {
471    
472                            @Override
473                            public void run() {
474                                    try {
475                                            if (_batchCount > 0) {
476                                                    _doCommit();
477                                            }
478                                    }
479                                    catch (IOException ioe) {
480                                            _log.error("Could not run scheduled commit", ioe);
481                                    }
482                            }
483    
484                    };
485    
486                    _scheduledExecutorService.scheduleWithFixedDelay(
487                            runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
488                            TimeUnit.MILLISECONDS);
489            }
490    
491            private void _initIndexWriter() {
492                    try {
493                            Analyzer analyzer = new LimitTokenCountAnalyzer(
494                                    LuceneHelperUtil.getAnalyzer(),
495                                    PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
496    
497                            IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
498                                    LuceneHelperUtil.getVersion(), analyzer);
499    
500                            indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
501                            indexWriterConfig.setMergePolicy(_getMergePolicy());
502                            indexWriterConfig.setMergeScheduler(_getMergeScheduler());
503                            indexWriterConfig.setRAMBufferSizeMB(
504                                    PropsValues.LUCENE_BUFFER_SIZE);
505    
506                            _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
507    
508                            if (!IndexReader.indexExists(getLuceneDir())) {
509    
510                                    // Workaround for LUCENE-2386
511    
512                                    if (_log.isDebugEnabled()) {
513                                            _log.debug("Creating missing index");
514                                    }
515    
516                                    _indexWriter.commit();
517                            }
518                    }
519                    catch (Exception e) {
520                            _log.error(
521                                    "Initializing Lucene writer failed for " + _companyId, e);
522                    }
523            }
524    
525            private void _write(Term term, Document document) throws IOException {
526                    try {
527                            if (term != null) {
528                                    _indexWriter.updateDocument(term, document);
529                            }
530                            else {
531                                    _indexWriter.addDocument(document);
532                            }
533    
534                            _batchCount++;
535                    }
536                    finally {
537                            _commit();
538                    }
539            }
540    
541            private static final String _LUCENE_STORE_TYPE_FILE = "file";
542    
543            private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
544    
545            private static final String _LUCENE_STORE_TYPE_RAM = "ram";
546    
547            private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
548    
549            private volatile int _batchCount;
550            private Lock _commitLock = new ReentrantLock();
551            private long _companyId;
552            private Directory _directory;
553            private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
554                    new DumpIndexDeletionPolicy();
555            private IndexSearcherManager _indexSearcherManager;
556            private IndexWriter _indexWriter;
557            private String _path;
558            private Map<String, Directory> _ramDirectories =
559                    new ConcurrentHashMap<String, Directory>();
560            private ScheduledExecutorService _scheduledExecutorService;
561    
562            private static class InvalidateProcessCallable
563                    implements ProcessCallable<Serializable> {
564    
565                    public InvalidateProcessCallable(long companyId) {
566                            _companyId = companyId;
567                    }
568    
569                    @Override
570                    public Serializable call() {
571                            IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
572                                    _companyId);
573    
574                            indexAccessor.invalidate();
575    
576                            _invalidate(_companyId);
577    
578                            return null;
579                    }
580    
581                    private static final long serialVersionUID = 1L;
582    
583                    private final long _companyId;
584    
585            }
586    
587    }