001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021    import com.liferay.portal.kernel.process.ProcessCallable;
022    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023    import com.liferay.portal.kernel.resiliency.spi.SPI;
024    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025    import com.liferay.portal.kernel.search.SearchEngineUtil;
026    import com.liferay.portal.kernel.util.FileUtil;
027    import com.liferay.portal.kernel.util.InstanceFactory;
028    import com.liferay.portal.kernel.util.OSDetector;
029    import com.liferay.portal.kernel.util.StringPool;
030    import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
031    import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
032    import com.liferay.portal.util.ClassLoaderUtil;
033    import com.liferay.portal.util.PropsValues;
034    
035    import java.io.File;
036    import java.io.IOException;
037    import java.io.InputStream;
038    import java.io.OutputStream;
039    import java.io.Serializable;
040    
041    import java.util.Collection;
042    import java.util.Map;
043    import java.util.concurrent.ConcurrentHashMap;
044    import java.util.concurrent.Executors;
045    import java.util.concurrent.ScheduledExecutorService;
046    import java.util.concurrent.TimeUnit;
047    import java.util.concurrent.locks.Lock;
048    import java.util.concurrent.locks.ReentrantLock;
049    
050    import org.apache.lucene.analysis.Analyzer;
051    import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
052    import org.apache.lucene.document.Document;
053    import org.apache.lucene.index.IndexReader;
054    import org.apache.lucene.index.IndexWriter;
055    import org.apache.lucene.index.IndexWriterConfig;
056    import org.apache.lucene.index.LogMergePolicy;
057    import org.apache.lucene.index.MergePolicy;
058    import org.apache.lucene.index.MergeScheduler;
059    import org.apache.lucene.index.NoMergePolicy;
060    import org.apache.lucene.index.NoMergeScheduler;
061    import org.apache.lucene.index.Term;
062    import org.apache.lucene.search.IndexSearcher;
063    import org.apache.lucene.store.Directory;
064    import org.apache.lucene.store.FSDirectory;
065    import org.apache.lucene.store.MMapDirectory;
066    import org.apache.lucene.store.RAMDirectory;
067    
068    /**
069     * @author Harry Mark
070     * @author Brian Wing Shun Chan
071     * @author Bruno Farache
072     * @author Shuyang Zhou
073     * @author Mate Thurzo
074     */
075    public class IndexAccessorImpl implements IndexAccessor {
076    
077            public IndexAccessorImpl(long companyId) {
078                    _companyId = companyId;
079    
080                    _path = PropsValues.LUCENE_DIR.concat(
081                            String.valueOf(_companyId)).concat(StringPool.SLASH);
082    
083                    try {
084                            if (!SPIUtil.isSPI()) {
085                                    _checkLuceneDir();
086                                    _initIndexWriter();
087                                    _initCommitScheduler();
088    
089                                    _indexSearcherManager = new IndexSearcherManager(_indexWriter);
090                            }
091                            else {
092                                    _indexSearcherManager = new IndexSearcherManager(
093                                            getLuceneDir());
094                            }
095                    }
096                    catch (IOException ioe) {
097                            _log.error(
098                                    "Unable to initialize index searcher manager for company " +
099                                            _companyId,
100                                    ioe);
101                    }
102            }
103    
104            @Override
105            public IndexSearcher acquireIndexSearcher() throws IOException {
106                    return _indexSearcherManager.acquire();
107            }
108    
109            @Override
110            public void addDocument(Document document) throws IOException {
111                    if (SearchEngineUtil.isIndexReadOnly()) {
112                            return;
113                    }
114    
115                    _write(null, document);
116            }
117    
118            @Override
119            public void addDocuments(Collection<Document> documents)
120                    throws IOException {
121    
122                    try {
123                            for (Document document : documents) {
124                                    _indexWriter.addDocument(document);
125                            }
126    
127                            _batchCount++;
128                    }
129                    finally {
130                            _commit();
131                    }
132            }
133    
134            @Override
135            public void close() {
136                    if (SPIUtil.isSPI()) {
137                            return;
138                    }
139    
140                    try {
141                            _indexSearcherManager.close();
142    
143                            _indexWriter.close();
144    
145                            _directory.close();
146                    }
147                    catch (Exception e) {
148                            _log.error("Closing Lucene writer failed for " + _companyId, e);
149                    }
150    
151                    if (_scheduledExecutorService != null) {
152                            _scheduledExecutorService.shutdownNow();
153                    }
154            }
155    
156            @Override
157            public void delete() {
158                    if (SearchEngineUtil.isIndexReadOnly()) {
159                            return;
160                    }
161    
162                    _deleteDirectory();
163            }
164    
165            @Override
166            public void deleteDocuments(Term term) throws IOException {
167                    if (SearchEngineUtil.isIndexReadOnly()) {
168                            return;
169                    }
170    
171                    try {
172                            _indexWriter.deleteDocuments(term);
173    
174                            _batchCount++;
175                    }
176                    finally {
177                            _commit();
178                    }
179            }
180    
181            @Override
182            public void dumpIndex(OutputStream outputStream) throws IOException {
183                    try {
184                            _dumpIndexDeletionPolicy.dump(
185                                    outputStream, _indexWriter, _commitLock);
186                    }
187                    finally {
188                            _indexSearcherManager.invalidate();
189                    }
190            }
191    
192            @Override
193            public long getCompanyId() {
194                    return _companyId;
195            }
196    
197            @Override
198            public long getLastGeneration() {
199                    return _dumpIndexDeletionPolicy.getLastGeneration();
200            }
201    
202            @Override
203            public Directory getLuceneDir() {
204                    if (_directory != null) {
205                            return _directory;
206                    }
207    
208                    if (_log.isDebugEnabled()) {
209                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
210                    }
211    
212                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
213                            _directory = _getLuceneDirFile();
214                    }
215                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
216                                            _LUCENE_STORE_TYPE_JDBC)) {
217    
218                            throw new IllegalArgumentException(
219                                    "Store type JDBC is no longer supported in favor of SOLR");
220                    }
221                    else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
222                            _directory = new RAMDirectory();
223                    }
224                    else {
225                            throw new RuntimeException(
226                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
227                    }
228    
229                    return _directory;
230            }
231    
232            @Override
233            public void invalidate() {
234                    _indexSearcherManager.invalidate();
235            }
236    
237            @Override
238            public void loadIndex(InputStream inputStream) throws IOException {
239                    File tempFile = FileUtil.createTempFile();
240    
241                    Directory tempDirectory = FSDirectory.open(tempFile);
242    
243                    IndexCommitSerializationUtil.deserializeIndex(
244                            inputStream, tempDirectory);
245    
246                    _indexSearcherManager.close();
247    
248                    _indexWriter.close();
249    
250                    Directory luceneDirectory = getLuceneDir();
251    
252                    for (String fileName : luceneDirectory.listAll()) {
253                            luceneDirectory.deleteFile(fileName);
254                    }
255    
256                    for (String fileName : tempDirectory.listAll()) {
257                            tempDirectory.copy(getLuceneDir(), fileName, fileName);
258                    }
259    
260                    _initIndexWriter();
261    
262                    _indexSearcherManager = new IndexSearcherManager(_indexWriter);
263    
264                    tempDirectory.close();
265    
266                    FileUtil.deltree(tempFile);
267            }
268    
269            @Override
270            public void releaseIndexSearcher(IndexSearcher indexSearcher)
271                    throws IOException {
272    
273                    _indexSearcherManager.release(indexSearcher);
274            }
275    
276            @Override
277            public void updateDocument(Term term, Document document)
278                    throws IOException {
279    
280                    if (SearchEngineUtil.isIndexReadOnly()) {
281                            return;
282                    }
283    
284                    if (_log.isDebugEnabled()) {
285                            _log.debug("Indexing " + document);
286                    }
287    
288                    _write(term, document);
289            }
290    
291            private static void _invalidate(long companyId) {
292                    for (SPI spi : MPIHelperUtil.getSPIs()) {
293                            try {
294                                    RegistrationReference registrationReference =
295                                            spi.getRegistrationReference();
296    
297                                    IntrabandRPCUtil.execute(
298                                            registrationReference,
299                                            new InvalidateProcessCallable(companyId));
300                            }
301                            catch (Exception e) {
302                                    _log.error(
303                                            "Unable to invalidate SPI " + spi + " for company " +
304                                                    companyId,
305                                            e);
306                            }
307                    }
308            }
309    
310            private void _checkLuceneDir() {
311                    if (SearchEngineUtil.isIndexReadOnly()) {
312                            return;
313                    }
314    
315                    try {
316                            Directory directory = getLuceneDir();
317    
318                            if (IndexWriter.isLocked(directory)) {
319                                    IndexWriter.unlock(directory);
320                            }
321                    }
322                    catch (Exception e) {
323                            _log.error("Check Lucene directory failed for " + _companyId, e);
324                    }
325            }
326    
327            private void _commit() throws IOException {
328                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
329                            (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
330    
331                            _doCommit();
332                    }
333            }
334    
335            private void _deleteAll() {
336                    try {
337                            _indexWriter.deleteAll();
338    
339                            _doCommit();
340                    }
341                    catch (Exception e) {
342                            if (_log.isWarnEnabled()) {
343                                    _log.warn("Unable to delete index in directory " + _path);
344                            }
345                    }
346            }
347    
348            private void _deleteDirectory() {
349                    if (_log.isDebugEnabled()) {
350                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
351                    }
352    
353                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
354                            PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
355    
356                            _deleteAll();
357                    }
358                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
359                                            _LUCENE_STORE_TYPE_JDBC)) {
360    
361                            throw new IllegalArgumentException(
362                                    "Store type JDBC is no longer supported in favor of SOLR");
363                    }
364                    else {
365                            throw new RuntimeException(
366                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
367                    }
368            }
369    
370            private void _doCommit() throws IOException {
371                    if (_indexWriter != null) {
372                            _commitLock.lock();
373    
374                            try {
375                                    _indexWriter.commit();
376                            }
377                            finally {
378                                    _commitLock.unlock();
379    
380                                    _indexSearcherManager.invalidate();
381    
382                                    _invalidate(_companyId);
383                            }
384                    }
385    
386                    _batchCount = 0;
387            }
388    
389            private Directory _getLuceneDirFile() {
390                    Directory directory = null;
391    
392                    try {
393                            if (!OSDetector.isWindows() &&
394                                    PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
395    
396                                    directory = new MMapDirectory(new File(_path));
397                            }
398                            else {
399                                    directory = FSDirectory.open(new File(_path));
400                            }
401                    }
402                    catch (IOException ioe) {
403                            if (directory != null) {
404                                    try {
405                                            directory.close();
406                                    }
407                                    catch (Exception e) {
408                                    }
409                            }
410                    }
411    
412                    return directory;
413            }
414    
415            private MergePolicy _getMergePolicy() throws Exception {
416                    if (PropsValues.LUCENE_MERGE_POLICY.equals(
417                                    NoMergePolicy.class.getName())) {
418    
419                            return NoMergePolicy.NO_COMPOUND_FILES;
420                    }
421    
422                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
423    
424                    MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
425                            classLoader, PropsValues.LUCENE_MERGE_POLICY);
426    
427                    if (mergePolicy instanceof LogMergePolicy) {
428                            LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
429    
430                            logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
431                    }
432    
433                    return mergePolicy;
434            }
435    
436            private MergeScheduler _getMergeScheduler() throws Exception {
437                    if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
438                                    NoMergeScheduler.class.getName())) {
439    
440                            return NoMergeScheduler.INSTANCE;
441                    }
442    
443                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
444    
445                    return (MergeScheduler)InstanceFactory.newInstance(
446                            classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
447            }
448    
449            private void _initCommitScheduler() {
450                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
451                            (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
452    
453                            return;
454                    }
455    
456                    _scheduledExecutorService =
457                            Executors.newSingleThreadScheduledExecutor();
458    
459                    Runnable runnable = new Runnable() {
460    
461                            @Override
462                            public void run() {
463                                    try {
464                                            if (_batchCount > 0) {
465                                                    _doCommit();
466                                            }
467                                    }
468                                    catch (IOException ioe) {
469                                            _log.error("Could not run scheduled commit", ioe);
470                                    }
471                            }
472    
473                    };
474    
475                    _scheduledExecutorService.scheduleWithFixedDelay(
476                            runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
477                            TimeUnit.MILLISECONDS);
478            }
479    
480            private void _initIndexWriter() {
481                    try {
482                            Analyzer analyzer = new LimitTokenCountAnalyzer(
483                                    LuceneHelperUtil.getAnalyzer(),
484                                    PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
485    
486                            IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
487                                    LuceneHelperUtil.getVersion(), analyzer);
488    
489                            indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
490                            indexWriterConfig.setMergePolicy(_getMergePolicy());
491                            indexWriterConfig.setMergeScheduler(_getMergeScheduler());
492                            indexWriterConfig.setRAMBufferSizeMB(
493                                    PropsValues.LUCENE_BUFFER_SIZE);
494    
495                            _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
496    
497                            if (!IndexReader.indexExists(getLuceneDir())) {
498    
499                                    // Workaround for LUCENE-2386
500    
501                                    if (_log.isDebugEnabled()) {
502                                            _log.debug("Creating missing index");
503                                    }
504    
505                                    _indexWriter.commit();
506                            }
507                    }
508                    catch (Exception e) {
509                            _log.error(
510                                    "Initializing Lucene writer failed for " + _companyId, e);
511                    }
512            }
513    
514            private void _write(Term term, Document document) throws IOException {
515                    try {
516                            if (term != null) {
517                                    _indexWriter.updateDocument(term, document);
518                            }
519                            else {
520                                    _indexWriter.addDocument(document);
521                            }
522    
523                            _batchCount++;
524                    }
525                    finally {
526                            _commit();
527                    }
528            }
529    
530            private static final String _LUCENE_STORE_TYPE_FILE = "file";
531    
532            private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
533    
534            private static final String _LUCENE_STORE_TYPE_RAM = "ram";
535    
536            private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
537    
538            private volatile int _batchCount;
539            private Lock _commitLock = new ReentrantLock();
540            private long _companyId;
541            private Directory _directory;
542            private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
543                    new DumpIndexDeletionPolicy();
544            private IndexSearcherManager _indexSearcherManager;
545            private IndexWriter _indexWriter;
546            private String _path;
547            private Map<String, Directory> _ramDirectories =
548                    new ConcurrentHashMap<String, Directory>();
549            private ScheduledExecutorService _scheduledExecutorService;
550    
551            private static class InvalidateProcessCallable
552                    implements ProcessCallable<Serializable> {
553    
554                    public InvalidateProcessCallable(long companyId) {
555                            _companyId = companyId;
556                    }
557    
558                    @Override
559                    public Serializable call() {
560                            IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
561                                    _companyId);
562    
563                            indexAccessor.invalidate();
564    
565                            _invalidate(_companyId);
566    
567                            return null;
568                    }
569    
570                    private static final long serialVersionUID = 1L;
571    
572                    private final long _companyId;
573    
574            }
575    
576    }