001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
020    import com.liferay.portal.kernel.search.SearchEngineUtil;
021    import com.liferay.portal.kernel.util.FileUtil;
022    import com.liferay.portal.kernel.util.InstanceFactory;
023    import com.liferay.portal.kernel.util.StringPool;
024    import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
025    import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
026    import com.liferay.portal.util.ClassLoaderUtil;
027    import com.liferay.portal.util.PropsValues;
028    
029    import java.io.File;
030    import java.io.IOException;
031    import java.io.InputStream;
032    import java.io.OutputStream;
033    
034    import java.util.Collection;
035    import java.util.Map;
036    import java.util.concurrent.ConcurrentHashMap;
037    import java.util.concurrent.Executors;
038    import java.util.concurrent.ScheduledExecutorService;
039    import java.util.concurrent.TimeUnit;
040    import java.util.concurrent.locks.Lock;
041    import java.util.concurrent.locks.ReentrantLock;
042    
043    import org.apache.lucene.analysis.Analyzer;
044    import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
045    import org.apache.lucene.document.Document;
046    import org.apache.lucene.index.IndexReader;
047    import org.apache.lucene.index.IndexWriter;
048    import org.apache.lucene.index.IndexWriterConfig;
049    import org.apache.lucene.index.LogMergePolicy;
050    import org.apache.lucene.index.MergePolicy;
051    import org.apache.lucene.index.MergeScheduler;
052    import org.apache.lucene.index.NoMergePolicy;
053    import org.apache.lucene.index.NoMergeScheduler;
054    import org.apache.lucene.index.Term;
055    import org.apache.lucene.search.IndexSearcher;
056    import org.apache.lucene.search.MatchAllDocsQuery;
057    import org.apache.lucene.search.ScoreDoc;
058    import org.apache.lucene.search.TopDocs;
059    import org.apache.lucene.store.Directory;
060    import org.apache.lucene.store.FSDirectory;
061    import org.apache.lucene.store.MMapDirectory;
062    import org.apache.lucene.store.RAMDirectory;
063    
064    /**
065     * @author Harry Mark
066     * @author Brian Wing Shun Chan
067     * @author Bruno Farache
068     * @author Shuyang Zhou
069     * @author Mate Thurzo
070     */
071    public class IndexAccessorImpl implements IndexAccessor {
072    
073            public IndexAccessorImpl(long companyId) {
074                    _companyId = companyId;
075    
076                    if (!SPIUtil.isSPI()) {
077                            _checkLuceneDir();
078                            _initIndexWriter();
079                            _initCommitScheduler();
080                    }
081            }
082    
083            @Override
084            public void addDocument(Document document) throws IOException {
085                    if (SearchEngineUtil.isIndexReadOnly()) {
086                            return;
087                    }
088    
089                    _write(null, document);
090            }
091    
092            @Override
093            public void addDocuments(Collection<Document> documents)
094                    throws IOException {
095    
096                    try {
097                            for (Document document : documents) {
098                                    _indexWriter.addDocument(document);
099                            }
100    
101                            _batchCount++;
102                    }
103                    finally {
104                            _commit();
105                    }
106            }
107    
108            @Override
109            public void close() {
110                    if (SPIUtil.isSPI()) {
111                            return;
112                    }
113    
114                    try {
115                            _indexWriter.close();
116                    }
117                    catch (Exception e) {
118                            _log.error("Closing Lucene writer failed for " + _companyId, e);
119                    }
120            }
121    
122            @Override
123            public void delete() {
124                    if (SearchEngineUtil.isIndexReadOnly()) {
125                            return;
126                    }
127    
128                    _deleteDirectory();
129            }
130    
131            @Override
132            public void deleteDocuments(Term term) throws IOException {
133                    if (SearchEngineUtil.isIndexReadOnly()) {
134                            return;
135                    }
136    
137                    try {
138                            _indexWriter.deleteDocuments(term);
139    
140                            _batchCount++;
141                    }
142                    finally {
143                            _commit();
144                    }
145            }
146    
147            @Override
148            public void dumpIndex(OutputStream outputStream) throws IOException {
149                    _dumpIndexDeletionPolicy.dump(outputStream, _indexWriter, _commitLock);
150            }
151    
152            @Override
153            public long getCompanyId() {
154                    return _companyId;
155            }
156    
157            @Override
158            public long getLastGeneration() {
159                    return _dumpIndexDeletionPolicy.getLastGeneration();
160            }
161    
162            @Override
163            public Directory getLuceneDir() {
164                    if (_log.isDebugEnabled()) {
165                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
166                    }
167    
168                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
169                            return _getLuceneDirFile();
170                    }
171                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
172                                            _LUCENE_STORE_TYPE_JDBC)) {
173    
174                            throw new IllegalArgumentException(
175                                    "Store type JDBC is no longer supported in favor of SOLR");
176                    }
177                    else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
178                            return _getLuceneDirRam();
179                    }
180                    else {
181                            throw new RuntimeException(
182                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
183                    }
184            }
185    
186            @Override
187            public void loadIndex(InputStream inputStream) throws IOException {
188                    File tempFile = FileUtil.createTempFile();
189    
190                    Directory tempDirectory = FSDirectory.open(tempFile);
191    
192                    IndexCommitSerializationUtil.deserializeIndex(
193                            inputStream, tempDirectory);
194    
195                    _deleteDirectory();
196    
197                    IndexReader indexReader = IndexReader.open(tempDirectory, false);
198    
199                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
200    
201                    try {
202                            TopDocs topDocs = indexSearcher.search(
203                                    new MatchAllDocsQuery(), indexReader.numDocs());
204    
205                            ScoreDoc[] scoreDocs = topDocs.scoreDocs;
206    
207                            for (ScoreDoc scoreDoc : scoreDocs) {
208                                    Document document = indexSearcher.doc(scoreDoc.doc);
209    
210                                    addDocument(document);
211                            }
212                    }
213                    catch (IllegalArgumentException iae) {
214                            if (_log.isDebugEnabled()) {
215                                    _log.debug(iae.getMessage());
216                            }
217                    }
218    
219                    indexSearcher.close();
220    
221                    indexReader.flush();
222                    indexReader.close();
223    
224                    tempDirectory.close();
225    
226                    FileUtil.deltree(tempFile);
227            }
228    
229            @Override
230            public void updateDocument(Term term, Document document)
231                    throws IOException {
232    
233                    if (SearchEngineUtil.isIndexReadOnly()) {
234                            return;
235                    }
236    
237                    if (_log.isDebugEnabled()) {
238                            _log.debug("Indexing " + document);
239                    }
240    
241                    _write(term, document);
242            }
243    
244            private void _checkLuceneDir() {
245                    if (SearchEngineUtil.isIndexReadOnly()) {
246                            return;
247                    }
248    
249                    try {
250                            Directory directory = getLuceneDir();
251    
252                            if (IndexWriter.isLocked(directory)) {
253                                    IndexWriter.unlock(directory);
254                            }
255                    }
256                    catch (Exception e) {
257                            _log.error("Check Lucene directory failed for " + _companyId, e);
258                    }
259            }
260    
261            private void _commit() throws IOException {
262                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
263                            (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
264    
265                            _doCommit();
266                    }
267            }
268    
269            private void _deleteAll() {
270                    String path = _getPath();
271    
272                    try {
273                            _indexWriter.deleteAll();
274    
275                            _indexWriter.commit();
276                    }
277                    catch (Exception e) {
278                            if (_log.isWarnEnabled()) {
279                                    _log.warn("Unable to delete index in directory " + path);
280                            }
281                    }
282            }
283    
284            private void _deleteDirectory() {
285                    if (_log.isDebugEnabled()) {
286                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
287                    }
288    
289                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
290                            PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
291    
292                            _deleteAll();
293                    }
294                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
295                                            _LUCENE_STORE_TYPE_JDBC)) {
296    
297                            throw new IllegalArgumentException(
298                                    "Store type JDBC is no longer supported in favor of SOLR");
299                    }
300                    else {
301                            throw new RuntimeException(
302                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
303                    }
304            }
305    
306            private void _doCommit() throws IOException {
307                    if (_indexWriter != null) {
308                            _commitLock.lock();
309    
310                            try {
311                                    _indexWriter.commit();
312                            }
313                            finally {
314                                    _commitLock.unlock();
315                            }
316                    }
317    
318                    _batchCount = 0;
319            }
320    
321            private FSDirectory _getDirectory(String path) throws IOException {
322                    if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
323                            return new MMapDirectory(new File(path));
324                    }
325                    else {
326                            return FSDirectory.open(new File(path));
327                    }
328            }
329    
330            private Directory _getLuceneDirFile() {
331                    Directory directory = null;
332    
333                    String path = _getPath();
334    
335                    try {
336                            directory = _getDirectory(path);
337                    }
338                    catch (IOException ioe) {
339                            if (directory != null) {
340                                    try {
341                                            directory.close();
342                                    }
343                                    catch (Exception e) {
344                                    }
345                            }
346                    }
347    
348                    return directory;
349            }
350    
351            private Directory _getLuceneDirRam() {
352                    String path = _getPath();
353    
354                    Directory directory = _ramDirectories.get(path);
355    
356                    if (directory == null) {
357                            directory = new RAMDirectory();
358    
359                            _ramDirectories.put(path, directory);
360                    }
361    
362                    return directory;
363            }
364    
365            private MergePolicy _getMergePolicy() throws Exception {
366                    if (PropsValues.LUCENE_MERGE_POLICY.equals(
367                                    NoMergePolicy.class.getName())) {
368    
369                            return NoMergePolicy.NO_COMPOUND_FILES;
370                    }
371    
372                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
373    
374                    MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
375                            classLoader, PropsValues.LUCENE_MERGE_POLICY);
376    
377                    if (mergePolicy instanceof LogMergePolicy) {
378                            LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
379    
380                            logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
381                    }
382    
383                    return mergePolicy;
384            }
385    
386            private MergeScheduler _getMergeScheduler() throws Exception {
387                    if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
388                                    NoMergeScheduler.class.getName())) {
389    
390                            return NoMergeScheduler.INSTANCE;
391                    }
392    
393                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
394    
395                    return (MergeScheduler)InstanceFactory.newInstance(
396                            classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
397            }
398    
399            private String _getPath() {
400                    return PropsValues.LUCENE_DIR.concat(String.valueOf(_companyId)).concat(
401                            StringPool.SLASH);
402            }
403    
404            private void _initCommitScheduler() {
405                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
406                            (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
407    
408                            return;
409                    }
410    
411                    ScheduledExecutorService scheduledExecutorService =
412                            Executors.newSingleThreadScheduledExecutor();
413    
414                    Runnable runnable = new Runnable() {
415    
416                            @Override
417                            public void run() {
418                                    try {
419                                            if (_batchCount > 0) {
420                                                    _doCommit();
421                                            }
422                                    }
423                                    catch (IOException ioe) {
424                                            _log.error("Could not run scheduled commit", ioe);
425                                    }
426                            }
427    
428                    };
429    
430                    scheduledExecutorService.scheduleWithFixedDelay(
431                            runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
432                            TimeUnit.MILLISECONDS);
433            }
434    
435            private void _initIndexWriter() {
436                    try {
437                            Analyzer analyzer = new LimitTokenCountAnalyzer(
438                                    LuceneHelperUtil.getAnalyzer(),
439                                    PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
440    
441                            IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
442                                    LuceneHelperUtil.getVersion(), analyzer);
443    
444                            indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
445                            indexWriterConfig.setMergePolicy(_getMergePolicy());
446                            indexWriterConfig.setMergeScheduler(_getMergeScheduler());
447                            indexWriterConfig.setRAMBufferSizeMB(
448                                    PropsValues.LUCENE_BUFFER_SIZE);
449    
450                            _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
451    
452                            if (!IndexReader.indexExists(getLuceneDir())) {
453    
454                                    // Workaround for LUCENE-2386
455    
456                                    if (_log.isDebugEnabled()) {
457                                            _log.debug("Creating missing index");
458                                    }
459    
460                                    _doCommit();
461                            }
462                    }
463                    catch (Exception e) {
464                            _log.error(
465                                    "Initializing Lucene writer failed for " + _companyId, e);
466                    }
467            }
468    
469            private void _write(Term term, Document document) throws IOException {
470                    try {
471                            if (term != null) {
472                                    _indexWriter.updateDocument(term, document);
473                            }
474                            else {
475                                    _indexWriter.addDocument(document);
476                            }
477    
478                            _batchCount++;
479                    }
480                    finally {
481                            _commit();
482                    }
483            }
484    
485            private static final String _LUCENE_STORE_TYPE_FILE = "file";
486    
487            private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
488    
489            private static final String _LUCENE_STORE_TYPE_RAM = "ram";
490    
491            private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
492    
493            private volatile int _batchCount;
494            private Lock _commitLock = new ReentrantLock();
495            private long _companyId;
496            private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
497                    new DumpIndexDeletionPolicy();
498            private IndexWriter _indexWriter;
499            private Map<String, Directory> _ramDirectories =
500                    new ConcurrentHashMap<String, Directory>();
501    
502    }