001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021 import com.liferay.portal.kernel.process.ProcessCallable;
022 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023 import com.liferay.portal.kernel.resiliency.spi.SPI;
024 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025 import com.liferay.portal.kernel.search.SearchEngineUtil;
026 import com.liferay.portal.kernel.util.FileUtil;
027 import com.liferay.portal.kernel.util.InstanceFactory;
028 import com.liferay.portal.kernel.util.StringPool;
029 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
030 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
031 import com.liferay.portal.util.ClassLoaderUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.io.File;
035 import java.io.IOException;
036 import java.io.InputStream;
037 import java.io.OutputStream;
038 import java.io.Serializable;
039
040 import java.util.Collection;
041 import java.util.concurrent.Executors;
042 import java.util.concurrent.ScheduledExecutorService;
043 import java.util.concurrent.TimeUnit;
044 import java.util.concurrent.locks.Lock;
045 import java.util.concurrent.locks.ReentrantLock;
046
047 import org.apache.lucene.analysis.Analyzer;
048 import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
049 import org.apache.lucene.document.Document;
050 import org.apache.lucene.index.IndexReader;
051 import org.apache.lucene.index.IndexWriter;
052 import org.apache.lucene.index.IndexWriterConfig;
053 import org.apache.lucene.index.LogMergePolicy;
054 import org.apache.lucene.index.MergePolicy;
055 import org.apache.lucene.index.MergeScheduler;
056 import org.apache.lucene.index.NoMergePolicy;
057 import org.apache.lucene.index.NoMergeScheduler;
058 import org.apache.lucene.index.Term;
059 import org.apache.lucene.search.IndexSearcher;
060 import org.apache.lucene.search.MatchAllDocsQuery;
061 import org.apache.lucene.search.ScoreDoc;
062 import org.apache.lucene.search.TopDocs;
063 import org.apache.lucene.store.Directory;
064 import org.apache.lucene.store.FSDirectory;
065 import org.apache.lucene.store.MMapDirectory;
066 import org.apache.lucene.store.RAMDirectory;
067
068
075 public class IndexAccessorImpl implements IndexAccessor {
076
077 public IndexAccessorImpl(long companyId) {
078 _companyId = companyId;
079
080 _path = PropsValues.LUCENE_DIR + _companyId + StringPool.SLASH;
081
082 try {
083 if (!SPIUtil.isSPI()) {
084 _checkLuceneDir();
085 _initIndexWriter();
086 _initCommitScheduler();
087
088 _indexSearcherManager = new IndexSearcherManager(_indexWriter);
089 }
090 else {
091 _indexSearcherManager = new IndexSearcherManager(
092 getLuceneDir());
093 }
094 }
095 catch (IOException ioe) {
096 _log.error(
097 "Unable to initialize index searcher manager for company " +
098 _companyId,
099 ioe);
100 }
101 }
102
103 @Override
104 public IndexSearcher acquireIndexSearcher() throws IOException {
105 return _indexSearcherManager.acquire();
106 }
107
108 @Override
109 public void addDocument(Document document) throws IOException {
110 if (SearchEngineUtil.isIndexReadOnly()) {
111 return;
112 }
113
114 _write(null, document);
115 }
116
117 @Override
118 public void addDocuments(Collection<Document> documents)
119 throws IOException {
120
121 try {
122 for (Document document : documents) {
123 _indexWriter.addDocument(document);
124 }
125
126 _batchCount++;
127 }
128 finally {
129 _commit();
130 }
131 }
132
133 @Override
134 public void close() {
135 if (SPIUtil.isSPI()) {
136 return;
137 }
138
139 try {
140 _indexSearcherManager.close();
141
142 _indexWriter.close();
143
144 _directory.close();
145 }
146 catch (Exception e) {
147 _log.error("Closing Lucene writer failed for " + _companyId, e);
148 }
149
150 if (_scheduledExecutorService != null) {
151 _scheduledExecutorService.shutdownNow();
152 }
153 }
154
155 @Override
156 public void delete() {
157 if (SearchEngineUtil.isIndexReadOnly()) {
158 return;
159 }
160
161 _deleteDirectory();
162 }
163
164 @Override
165 public void deleteDocuments(Term term) throws IOException {
166 if (SearchEngineUtil.isIndexReadOnly()) {
167 return;
168 }
169
170 try {
171 _indexWriter.deleteDocuments(term);
172
173 _batchCount++;
174 }
175 finally {
176 _commit();
177 }
178 }
179
180 @Override
181 public void dumpIndex(OutputStream outputStream) throws IOException {
182 try {
183 _dumpIndexDeletionPolicy.dump(
184 outputStream, _indexWriter, _commitLock);
185 }
186 finally {
187 _indexSearcherManager.invalidate();
188 }
189 }
190
191 @Override
192 public long getCompanyId() {
193 return _companyId;
194 }
195
196 @Override
197 public long getLastGeneration() {
198 return _dumpIndexDeletionPolicy.getLastGeneration();
199 }
200
201 @Override
202 public Directory getLuceneDir() {
203 if (_directory != null) {
204 return _directory;
205 }
206
207 if (_log.isDebugEnabled()) {
208 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
209 }
210
211 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
212 _directory = _getLuceneDirFile();
213 }
214 else if (PropsValues.LUCENE_STORE_TYPE.equals(
215 _LUCENE_STORE_TYPE_JDBC)) {
216
217 throw new IllegalArgumentException(
218 "Store type JDBC is no longer supported in favor of SOLR");
219 }
220 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
221 _directory = new RAMDirectory();
222 }
223 else {
224 throw new RuntimeException(
225 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
226 }
227
228 return _directory;
229 }
230
231 @Override
232 public void invalidate() {
233 _indexSearcherManager.invalidate();
234 }
235
236 @Override
237 public void loadIndex(InputStream inputStream) throws IOException {
238 File tempFile = FileUtil.createTempFile();
239
240 try (Directory tempDirectory = FSDirectory.open(tempFile)) {
241 IndexCommitSerializationUtil.deserializeIndex(
242 inputStream, tempDirectory);
243
244 _deleteDirectory();
245
246 try (IndexReader indexReader = IndexReader.open(
247 tempDirectory, false)) {
248
249 if (indexReader.numDocs() > 0) {
250 try (IndexSearcher indexSearcher = new IndexSearcher(
251 indexReader)) {
252
253 TopDocs topDocs = indexSearcher.search(
254 new MatchAllDocsQuery(), indexReader.numDocs());
255
256 ScoreDoc[] scoreDocs = topDocs.scoreDocs;
257
258 for (ScoreDoc scoreDoc : scoreDocs) {
259 Document document = indexSearcher.doc(scoreDoc.doc);
260
261 addDocument(document);
262 }
263 }
264 catch (IllegalArgumentException iae) {
265 if (_log.isDebugEnabled()) {
266 _log.debug(iae.getMessage());
267 }
268 }
269 }
270
271 indexReader.flush();
272 }
273 }
274
275 FileUtil.deltree(tempFile);
276 }
277
278 @Override
279 public void releaseIndexSearcher(IndexSearcher indexSearcher)
280 throws IOException {
281
282 _indexSearcherManager.release(indexSearcher);
283 }
284
285 @Override
286 public void updateDocument(Term term, Document document)
287 throws IOException {
288
289 if (SearchEngineUtil.isIndexReadOnly()) {
290 return;
291 }
292
293 if (_log.isDebugEnabled()) {
294 _log.debug("Indexing " + document);
295 }
296
297 _write(term, document);
298 }
299
300 private static void _invalidate(long companyId) {
301 for (SPI spi : MPIHelperUtil.getSPIs()) {
302 try {
303 RegistrationReference registrationReference =
304 spi.getRegistrationReference();
305
306 IntrabandRPCUtil.execute(
307 registrationReference,
308 new InvalidateProcessCallable(companyId));
309 }
310 catch (Exception e) {
311 _log.error(
312 "Unable to invalidate SPI " + spi + " for company " +
313 companyId,
314 e);
315 }
316 }
317 }
318
319 private void _checkLuceneDir() {
320 if (SearchEngineUtil.isIndexReadOnly()) {
321 return;
322 }
323
324 try {
325 Directory directory = getLuceneDir();
326
327 if (IndexWriter.isLocked(directory)) {
328 IndexWriter.unlock(directory);
329 }
330 }
331 catch (Exception e) {
332 _log.error("Check Lucene directory failed for " + _companyId, e);
333 }
334 }
335
336 private void _commit() throws IOException {
337 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
338 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
339
340 _doCommit();
341 }
342 }
343
344 private void _deleteAll() {
345 try {
346 _indexWriter.deleteAll();
347
348 _doCommit();
349 }
350 catch (Exception e) {
351 if (_log.isWarnEnabled()) {
352 _log.warn("Unable to delete index in directory " + _path);
353 }
354 }
355 }
356
357 private void _deleteDirectory() {
358 if (_log.isDebugEnabled()) {
359 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
360 }
361
362 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
363 PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
364
365 _deleteAll();
366 }
367 else if (PropsValues.LUCENE_STORE_TYPE.equals(
368 _LUCENE_STORE_TYPE_JDBC)) {
369
370 throw new IllegalArgumentException(
371 "Store type JDBC is no longer supported in favor of SOLR");
372 }
373 else {
374 throw new RuntimeException(
375 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
376 }
377 }
378
379 private void _doCommit() throws IOException {
380 if (_indexWriter != null) {
381 _commitLock.lock();
382
383 try {
384 _indexWriter.commit();
385 }
386 finally {
387 _commitLock.unlock();
388
389 _indexSearcherManager.invalidate();
390
391 _invalidate(_companyId);
392 }
393 }
394
395 _batchCount = 0;
396 }
397
398 private Directory _getLuceneDirFile() {
399 Directory directory = null;
400
401 try {
402 if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
403 directory = new MMapDirectory(new File(_path));
404 }
405 else {
406 directory = FSDirectory.open(new File(_path));
407 }
408 }
409 catch (IOException ioe) {
410 if (directory != null) {
411 try {
412 directory.close();
413 }
414 catch (Exception e) {
415 }
416 }
417 }
418
419 return directory;
420 }
421
422 private MergePolicy _getMergePolicy() throws Exception {
423 if (PropsValues.LUCENE_MERGE_POLICY.equals(
424 NoMergePolicy.class.getName())) {
425
426 return NoMergePolicy.NO_COMPOUND_FILES;
427 }
428
429 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
430
431 MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
432 classLoader, PropsValues.LUCENE_MERGE_POLICY);
433
434 if (mergePolicy instanceof LogMergePolicy) {
435 LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
436
437 logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
438 }
439
440 return mergePolicy;
441 }
442
443 private MergeScheduler _getMergeScheduler() throws Exception {
444 if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
445 NoMergeScheduler.class.getName())) {
446
447 return NoMergeScheduler.INSTANCE;
448 }
449
450 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
451
452 return (MergeScheduler)InstanceFactory.newInstance(
453 classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
454 }
455
456 private void _initCommitScheduler() {
457 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
458 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
459
460 return;
461 }
462
463 _scheduledExecutorService =
464 Executors.newSingleThreadScheduledExecutor();
465
466 Runnable runnable = new Runnable() {
467
468 @Override
469 public void run() {
470 try {
471 if (_batchCount > 0) {
472 _doCommit();
473 }
474 }
475 catch (IOException ioe) {
476 _log.error("Could not run scheduled commit", ioe);
477 }
478 }
479
480 };
481
482 _scheduledExecutorService.scheduleWithFixedDelay(
483 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
484 TimeUnit.MILLISECONDS);
485 }
486
487 private void _initIndexWriter() {
488 try {
489 Analyzer analyzer = new LimitTokenCountAnalyzer(
490 LuceneHelperUtil.getAnalyzer(),
491 PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
492
493 IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
494 LuceneHelperUtil.getVersion(), analyzer);
495
496 indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
497 indexWriterConfig.setMergePolicy(_getMergePolicy());
498 indexWriterConfig.setMergeScheduler(_getMergeScheduler());
499 indexWriterConfig.setRAMBufferSizeMB(
500 PropsValues.LUCENE_BUFFER_SIZE);
501
502 _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
503
504 if (!IndexReader.indexExists(getLuceneDir())) {
505
506
507
508 if (_log.isDebugEnabled()) {
509 _log.debug("Creating missing index");
510 }
511
512 _indexWriter.commit();
513 }
514 }
515 catch (Exception e) {
516 _log.error(
517 "Initializing Lucene writer failed for " + _companyId, e);
518 }
519 }
520
521 private void _write(Term term, Document document) throws IOException {
522 try {
523 if (term != null) {
524 _indexWriter.updateDocument(term, document);
525 }
526 else {
527 _indexWriter.addDocument(document);
528 }
529
530 _batchCount++;
531 }
532 finally {
533 _commit();
534 }
535 }
536
537 private static final String _LUCENE_STORE_TYPE_FILE = "file";
538
539 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
540
541 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
542
543 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
544
545 private volatile int _batchCount;
546 private Lock _commitLock = new ReentrantLock();
547 private long _companyId;
548 private Directory _directory;
549 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
550 new DumpIndexDeletionPolicy();
551 private IndexSearcherManager _indexSearcherManager;
552 private IndexWriter _indexWriter;
553 private String _path;
554 private ScheduledExecutorService _scheduledExecutorService;
555
556 private static class InvalidateProcessCallable
557 implements ProcessCallable<Serializable> {
558
559 public InvalidateProcessCallable(long companyId) {
560 _companyId = companyId;
561 }
562
563 @Override
564 public Serializable call() {
565 IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
566 _companyId);
567
568 indexAccessor.invalidate();
569
570 _invalidate(_companyId);
571
572 return null;
573 }
574
575 private static final long serialVersionUID = 1L;
576
577 private final long _companyId;
578
579 }
580
581 }