001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021 import com.liferay.portal.kernel.process.ProcessCallable;
022 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023 import com.liferay.portal.kernel.resiliency.spi.SPI;
024 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025 import com.liferay.portal.kernel.search.SearchEngineUtil;
026 import com.liferay.portal.kernel.util.FileUtil;
027 import com.liferay.portal.kernel.util.InstanceFactory;
028 import com.liferay.portal.kernel.util.StringPool;
029 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
030 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
031 import com.liferay.portal.util.ClassLoaderUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.io.File;
035 import java.io.IOException;
036 import java.io.InputStream;
037 import java.io.OutputStream;
038 import java.io.Serializable;
039
040 import java.util.Collection;
041 import java.util.Map;
042 import java.util.concurrent.ConcurrentHashMap;
043 import java.util.concurrent.Executors;
044 import java.util.concurrent.ScheduledExecutorService;
045 import java.util.concurrent.TimeUnit;
046 import java.util.concurrent.locks.Lock;
047 import java.util.concurrent.locks.ReentrantLock;
048
049 import org.apache.lucene.analysis.Analyzer;
050 import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
051 import org.apache.lucene.document.Document;
052 import org.apache.lucene.index.IndexReader;
053 import org.apache.lucene.index.IndexWriter;
054 import org.apache.lucene.index.IndexWriterConfig;
055 import org.apache.lucene.index.LogMergePolicy;
056 import org.apache.lucene.index.MergePolicy;
057 import org.apache.lucene.index.MergeScheduler;
058 import org.apache.lucene.index.NoMergePolicy;
059 import org.apache.lucene.index.NoMergeScheduler;
060 import org.apache.lucene.index.Term;
061 import org.apache.lucene.search.IndexSearcher;
062 import org.apache.lucene.search.MatchAllDocsQuery;
063 import org.apache.lucene.search.ScoreDoc;
064 import org.apache.lucene.search.TopDocs;
065 import org.apache.lucene.store.Directory;
066 import org.apache.lucene.store.FSDirectory;
067 import org.apache.lucene.store.MMapDirectory;
068 import org.apache.lucene.store.RAMDirectory;
069
070
077 public class IndexAccessorImpl implements IndexAccessor {
078
079 public IndexAccessorImpl(long companyId) {
080 _companyId = companyId;
081
082 _path = PropsValues.LUCENE_DIR.concat(
083 String.valueOf(_companyId)).concat(StringPool.SLASH);
084
085 try {
086 if (!SPIUtil.isSPI()) {
087 _checkLuceneDir();
088 _initIndexWriter();
089 _initCommitScheduler();
090
091 _indexSearcherManager = new IndexSearcherManager(_indexWriter);
092 }
093 else {
094 _indexSearcherManager = new IndexSearcherManager(
095 getLuceneDir());
096 }
097 }
098 catch (IOException ioe) {
099 _log.error(
100 "Unable to initialize index searcher manager for company " +
101 _companyId,
102 ioe);
103 }
104 }
105
106 @Override
107 public IndexSearcher acquireIndexSearcher() throws IOException {
108 return _indexSearcherManager.acquire();
109 }
110
111 @Override
112 public void addDocument(Document document) throws IOException {
113 if (SearchEngineUtil.isIndexReadOnly()) {
114 return;
115 }
116
117 _write(null, document);
118 }
119
120 @Override
121 public void addDocuments(Collection<Document> documents)
122 throws IOException {
123
124 try {
125 for (Document document : documents) {
126 _indexWriter.addDocument(document);
127 }
128
129 _batchCount++;
130 }
131 finally {
132 _commit();
133 }
134 }
135
136 @Override
137 public void close() {
138 if (SPIUtil.isSPI()) {
139 return;
140 }
141
142 try {
143 _indexSearcherManager.close();
144
145 _indexWriter.close();
146
147 _directory.close();
148 }
149 catch (Exception e) {
150 _log.error("Closing Lucene writer failed for " + _companyId, e);
151 }
152
153 if (_scheduledExecutorService != null) {
154 _scheduledExecutorService.shutdownNow();
155 }
156 }
157
158 @Override
159 public void delete() {
160 if (SearchEngineUtil.isIndexReadOnly()) {
161 return;
162 }
163
164 _deleteDirectory();
165 }
166
167 @Override
168 public void deleteDocuments(Term term) throws IOException {
169 if (SearchEngineUtil.isIndexReadOnly()) {
170 return;
171 }
172
173 try {
174 _indexWriter.deleteDocuments(term);
175
176 _batchCount++;
177 }
178 finally {
179 _commit();
180 }
181 }
182
183 @Override
184 public void dumpIndex(OutputStream outputStream) throws IOException {
185 try {
186 _dumpIndexDeletionPolicy.dump(
187 outputStream, _indexWriter, _commitLock);
188 }
189 finally {
190 _indexSearcherManager.invalidate();
191 }
192 }
193
194 @Override
195 public long getCompanyId() {
196 return _companyId;
197 }
198
199 @Override
200 public long getLastGeneration() {
201 return _dumpIndexDeletionPolicy.getLastGeneration();
202 }
203
204 @Override
205 public Directory getLuceneDir() {
206 if (_directory != null) {
207 return _directory;
208 }
209
210 if (_log.isDebugEnabled()) {
211 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
212 }
213
214 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
215 _directory = _getLuceneDirFile();
216 }
217 else if (PropsValues.LUCENE_STORE_TYPE.equals(
218 _LUCENE_STORE_TYPE_JDBC)) {
219
220 throw new IllegalArgumentException(
221 "Store type JDBC is no longer supported in favor of SOLR");
222 }
223 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
224 _directory = new RAMDirectory();
225 }
226 else {
227 throw new RuntimeException(
228 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
229 }
230
231 return _directory;
232 }
233
234 @Override
235 public void invalidate() {
236 _indexSearcherManager.invalidate();
237 }
238
239 @Override
240 public void loadIndex(InputStream inputStream) throws IOException {
241 File tempFile = FileUtil.createTempFile();
242
243 Directory tempDirectory = FSDirectory.open(tempFile);
244
245 IndexCommitSerializationUtil.deserializeIndex(
246 inputStream, tempDirectory);
247
248 _deleteDirectory();
249
250 IndexReader indexReader = IndexReader.open(tempDirectory, false);
251
252 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
253
254 try {
255 TopDocs topDocs = indexSearcher.search(
256 new MatchAllDocsQuery(), indexReader.numDocs());
257
258 ScoreDoc[] scoreDocs = topDocs.scoreDocs;
259
260 for (ScoreDoc scoreDoc : scoreDocs) {
261 Document document = indexSearcher.doc(scoreDoc.doc);
262
263 addDocument(document);
264 }
265 }
266 catch (IllegalArgumentException iae) {
267 if (_log.isDebugEnabled()) {
268 _log.debug(iae.getMessage());
269 }
270 }
271
272 indexSearcher.close();
273
274 indexReader.flush();
275 indexReader.close();
276
277 tempDirectory.close();
278
279 FileUtil.deltree(tempFile);
280 }
281
282 @Override
283 public void releaseIndexSearcher(IndexSearcher indexSearcher)
284 throws IOException {
285
286 _indexSearcherManager.release(indexSearcher);
287 }
288
289 @Override
290 public void updateDocument(Term term, Document document)
291 throws IOException {
292
293 if (SearchEngineUtil.isIndexReadOnly()) {
294 return;
295 }
296
297 if (_log.isDebugEnabled()) {
298 _log.debug("Indexing " + document);
299 }
300
301 _write(term, document);
302 }
303
304 private static void _invalidate(long companyId) {
305 for (SPI spi : MPIHelperUtil.getSPIs()) {
306 try {
307 RegistrationReference registrationReference =
308 spi.getRegistrationReference();
309
310 IntrabandRPCUtil.execute(
311 registrationReference,
312 new InvalidateProcessCallable(companyId));
313 }
314 catch (Exception e) {
315 _log.error(
316 "Unable to invalidate SPI " + spi + " for company " +
317 companyId,
318 e);
319 }
320 }
321 }
322
323 private void _checkLuceneDir() {
324 if (SearchEngineUtil.isIndexReadOnly()) {
325 return;
326 }
327
328 try {
329 Directory directory = getLuceneDir();
330
331 if (IndexWriter.isLocked(directory)) {
332 IndexWriter.unlock(directory);
333 }
334 }
335 catch (Exception e) {
336 _log.error("Check Lucene directory failed for " + _companyId, e);
337 }
338 }
339
340 private void _commit() throws IOException {
341 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
342 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
343
344 _doCommit();
345 }
346 }
347
348 private void _deleteAll() {
349 try {
350 _indexWriter.deleteAll();
351
352 _doCommit();
353 }
354 catch (Exception e) {
355 if (_log.isWarnEnabled()) {
356 _log.warn("Unable to delete index in directory " + _path);
357 }
358 }
359 }
360
361 private void _deleteDirectory() {
362 if (_log.isDebugEnabled()) {
363 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
364 }
365
366 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
367 PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
368
369 _deleteAll();
370 }
371 else if (PropsValues.LUCENE_STORE_TYPE.equals(
372 _LUCENE_STORE_TYPE_JDBC)) {
373
374 throw new IllegalArgumentException(
375 "Store type JDBC is no longer supported in favor of SOLR");
376 }
377 else {
378 throw new RuntimeException(
379 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
380 }
381 }
382
383 private void _doCommit() throws IOException {
384 if (_indexWriter != null) {
385 _commitLock.lock();
386
387 try {
388 _indexWriter.commit();
389 }
390 finally {
391 _commitLock.unlock();
392
393 _indexSearcherManager.invalidate();
394
395 _invalidate(_companyId);
396 }
397 }
398
399 _batchCount = 0;
400 }
401
402 private Directory _getLuceneDirFile() {
403 Directory directory = null;
404
405 try {
406 if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
407 directory = new MMapDirectory(new File(_path));
408 }
409 else {
410 directory = FSDirectory.open(new File(_path));
411 }
412 }
413 catch (IOException ioe) {
414 if (directory != null) {
415 try {
416 directory.close();
417 }
418 catch (Exception e) {
419 }
420 }
421 }
422
423 return directory;
424 }
425
426 private MergePolicy _getMergePolicy() throws Exception {
427 if (PropsValues.LUCENE_MERGE_POLICY.equals(
428 NoMergePolicy.class.getName())) {
429
430 return NoMergePolicy.NO_COMPOUND_FILES;
431 }
432
433 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
434
435 MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
436 classLoader, PropsValues.LUCENE_MERGE_POLICY);
437
438 if (mergePolicy instanceof LogMergePolicy) {
439 LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
440
441 logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
442 }
443
444 return mergePolicy;
445 }
446
447 private MergeScheduler _getMergeScheduler() throws Exception {
448 if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
449 NoMergeScheduler.class.getName())) {
450
451 return NoMergeScheduler.INSTANCE;
452 }
453
454 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
455
456 return (MergeScheduler)InstanceFactory.newInstance(
457 classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
458 }
459
460 private void _initCommitScheduler() {
461 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
462 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
463
464 return;
465 }
466
467 _scheduledExecutorService =
468 Executors.newSingleThreadScheduledExecutor();
469
470 Runnable runnable = new Runnable() {
471
472 @Override
473 public void run() {
474 try {
475 if (_batchCount > 0) {
476 _doCommit();
477 }
478 }
479 catch (IOException ioe) {
480 _log.error("Could not run scheduled commit", ioe);
481 }
482 }
483
484 };
485
486 _scheduledExecutorService.scheduleWithFixedDelay(
487 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
488 TimeUnit.MILLISECONDS);
489 }
490
491 private void _initIndexWriter() {
492 try {
493 Analyzer analyzer = new LimitTokenCountAnalyzer(
494 LuceneHelperUtil.getAnalyzer(),
495 PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
496
497 IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
498 LuceneHelperUtil.getVersion(), analyzer);
499
500 indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
501 indexWriterConfig.setMergePolicy(_getMergePolicy());
502 indexWriterConfig.setMergeScheduler(_getMergeScheduler());
503 indexWriterConfig.setRAMBufferSizeMB(
504 PropsValues.LUCENE_BUFFER_SIZE);
505
506 _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
507
508 if (!IndexReader.indexExists(getLuceneDir())) {
509
510
511
512 if (_log.isDebugEnabled()) {
513 _log.debug("Creating missing index");
514 }
515
516 _indexWriter.commit();
517 }
518 }
519 catch (Exception e) {
520 _log.error(
521 "Initializing Lucene writer failed for " + _companyId, e);
522 }
523 }
524
525 private void _write(Term term, Document document) throws IOException {
526 try {
527 if (term != null) {
528 _indexWriter.updateDocument(term, document);
529 }
530 else {
531 _indexWriter.addDocument(document);
532 }
533
534 _batchCount++;
535 }
536 finally {
537 _commit();
538 }
539 }
540
541 private static final String _LUCENE_STORE_TYPE_FILE = "file";
542
543 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
544
545 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
546
547 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
548
549 private volatile int _batchCount;
550 private Lock _commitLock = new ReentrantLock();
551 private long _companyId;
552 private Directory _directory;
553 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
554 new DumpIndexDeletionPolicy();
555 private IndexSearcherManager _indexSearcherManager;
556 private IndexWriter _indexWriter;
557 private String _path;
558 private Map<String, Directory> _ramDirectories =
559 new ConcurrentHashMap<String, Directory>();
560 private ScheduledExecutorService _scheduledExecutorService;
561
562 private static class InvalidateProcessCallable
563 implements ProcessCallable<Serializable> {
564
565 public InvalidateProcessCallable(long companyId) {
566 _companyId = companyId;
567 }
568
569 @Override
570 public Serializable call() {
571 IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
572 _companyId);
573
574 indexAccessor.invalidate();
575
576 _invalidate(_companyId);
577
578 return null;
579 }
580
581 private static final long serialVersionUID = 1L;
582
583 private final long _companyId;
584
585 }
586
587 }