001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021 import com.liferay.portal.kernel.process.ProcessCallable;
022 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023 import com.liferay.portal.kernel.resiliency.spi.SPI;
024 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025 import com.liferay.portal.kernel.search.SearchEngineUtil;
026 import com.liferay.portal.kernel.util.FileUtil;
027 import com.liferay.portal.kernel.util.InstanceFactory;
028 import com.liferay.portal.kernel.util.OSDetector;
029 import com.liferay.portal.kernel.util.StringPool;
030 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
031 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
032 import com.liferay.portal.util.ClassLoaderUtil;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.io.File;
036 import java.io.IOException;
037 import java.io.InputStream;
038 import java.io.OutputStream;
039 import java.io.Serializable;
040
041 import java.util.Collection;
042 import java.util.Map;
043 import java.util.concurrent.ConcurrentHashMap;
044 import java.util.concurrent.Executors;
045 import java.util.concurrent.ScheduledExecutorService;
046 import java.util.concurrent.TimeUnit;
047 import java.util.concurrent.locks.Lock;
048 import java.util.concurrent.locks.ReentrantLock;
049
050 import org.apache.lucene.analysis.Analyzer;
051 import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
052 import org.apache.lucene.document.Document;
053 import org.apache.lucene.index.IndexReader;
054 import org.apache.lucene.index.IndexWriter;
055 import org.apache.lucene.index.IndexWriterConfig;
056 import org.apache.lucene.index.LogMergePolicy;
057 import org.apache.lucene.index.MergePolicy;
058 import org.apache.lucene.index.MergeScheduler;
059 import org.apache.lucene.index.NoMergePolicy;
060 import org.apache.lucene.index.NoMergeScheduler;
061 import org.apache.lucene.index.Term;
062 import org.apache.lucene.search.IndexSearcher;
063 import org.apache.lucene.store.Directory;
064 import org.apache.lucene.store.FSDirectory;
065 import org.apache.lucene.store.MMapDirectory;
066 import org.apache.lucene.store.RAMDirectory;
067
068
075 public class IndexAccessorImpl implements IndexAccessor {
076
077 public IndexAccessorImpl(long companyId) {
078 _companyId = companyId;
079
080 _path = PropsValues.LUCENE_DIR.concat(
081 String.valueOf(_companyId)).concat(StringPool.SLASH);
082
083 try {
084 if (!SPIUtil.isSPI()) {
085 _checkLuceneDir();
086 _initIndexWriter();
087 _initCommitScheduler();
088
089 _indexSearcherManager = new IndexSearcherManager(_indexWriter);
090 }
091 else {
092 _indexSearcherManager = new IndexSearcherManager(
093 getLuceneDir());
094 }
095 }
096 catch (IOException ioe) {
097 _log.error(
098 "Unable to initialize index searcher manager for company " +
099 _companyId,
100 ioe);
101 }
102 }
103
104 @Override
105 public IndexSearcher acquireIndexSearcher() throws IOException {
106 return _indexSearcherManager.acquire();
107 }
108
109 @Override
110 public void addDocument(Document document) throws IOException {
111 if (SearchEngineUtil.isIndexReadOnly()) {
112 return;
113 }
114
115 _write(null, document);
116 }
117
118 @Override
119 public void addDocuments(Collection<Document> documents)
120 throws IOException {
121
122 try {
123 for (Document document : documents) {
124 _indexWriter.addDocument(document);
125 }
126
127 _batchCount++;
128 }
129 finally {
130 _commit();
131 }
132 }
133
134 @Override
135 public void close() {
136 if (SPIUtil.isSPI()) {
137 return;
138 }
139
140 try {
141 _indexSearcherManager.close();
142
143 _indexWriter.close();
144
145 _directory.close();
146 }
147 catch (Exception e) {
148 _log.error("Closing Lucene writer failed for " + _companyId, e);
149 }
150
151 if (_scheduledExecutorService != null) {
152 _scheduledExecutorService.shutdownNow();
153 }
154 }
155
156 @Override
157 public void delete() {
158 if (SearchEngineUtil.isIndexReadOnly()) {
159 return;
160 }
161
162 _deleteDirectory();
163 }
164
165 @Override
166 public void deleteDocuments(Term term) throws IOException {
167 if (SearchEngineUtil.isIndexReadOnly()) {
168 return;
169 }
170
171 try {
172 _indexWriter.deleteDocuments(term);
173
174 _batchCount++;
175 }
176 finally {
177 _commit();
178 }
179 }
180
181 @Override
182 public void dumpIndex(OutputStream outputStream) throws IOException {
183 try {
184 _dumpIndexDeletionPolicy.dump(
185 outputStream, _indexWriter, _commitLock);
186 }
187 finally {
188 _indexSearcherManager.invalidate();
189 }
190 }
191
192 @Override
193 public long getCompanyId() {
194 return _companyId;
195 }
196
197 @Override
198 public long getLastGeneration() {
199 return _dumpIndexDeletionPolicy.getLastGeneration();
200 }
201
202 @Override
203 public Directory getLuceneDir() {
204 if (_directory != null) {
205 return _directory;
206 }
207
208 if (_log.isDebugEnabled()) {
209 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
210 }
211
212 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
213 _directory = _getLuceneDirFile();
214 }
215 else if (PropsValues.LUCENE_STORE_TYPE.equals(
216 _LUCENE_STORE_TYPE_JDBC)) {
217
218 throw new IllegalArgumentException(
219 "Store type JDBC is no longer supported in favor of SOLR");
220 }
221 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
222 _directory = new RAMDirectory();
223 }
224 else {
225 throw new RuntimeException(
226 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
227 }
228
229 return _directory;
230 }
231
232 @Override
233 public void invalidate() {
234 _indexSearcherManager.invalidate();
235 }
236
237 @Override
238 public void loadIndex(InputStream inputStream) throws IOException {
239 File tempFile = FileUtil.createTempFile();
240
241 Directory tempDirectory = FSDirectory.open(tempFile);
242
243 IndexCommitSerializationUtil.deserializeIndex(
244 inputStream, tempDirectory);
245
246 _indexSearcherManager.close();
247
248 _indexWriter.close();
249
250 Directory luceneDirectory = getLuceneDir();
251
252 for (String fileName : luceneDirectory.listAll()) {
253 luceneDirectory.deleteFile(fileName);
254 }
255
256 for (String fileName : tempDirectory.listAll()) {
257 tempDirectory.copy(getLuceneDir(), fileName, fileName);
258 }
259
260 _initIndexWriter();
261
262 _indexSearcherManager = new IndexSearcherManager(_indexWriter);
263
264 tempDirectory.close();
265
266 FileUtil.deltree(tempFile);
267 }
268
269 @Override
270 public void releaseIndexSearcher(IndexSearcher indexSearcher)
271 throws IOException {
272
273 _indexSearcherManager.release(indexSearcher);
274 }
275
276 @Override
277 public void updateDocument(Term term, Document document)
278 throws IOException {
279
280 if (SearchEngineUtil.isIndexReadOnly()) {
281 return;
282 }
283
284 if (_log.isDebugEnabled()) {
285 _log.debug("Indexing " + document);
286 }
287
288 _write(term, document);
289 }
290
291 private static void _invalidate(long companyId) {
292 for (SPI spi : MPIHelperUtil.getSPIs()) {
293 try {
294 RegistrationReference registrationReference =
295 spi.getRegistrationReference();
296
297 IntrabandRPCUtil.execute(
298 registrationReference,
299 new InvalidateProcessCallable(companyId));
300 }
301 catch (Exception e) {
302 _log.error(
303 "Unable to invalidate SPI " + spi + " for company " +
304 companyId,
305 e);
306 }
307 }
308 }
309
310 private void _checkLuceneDir() {
311 if (SearchEngineUtil.isIndexReadOnly()) {
312 return;
313 }
314
315 try {
316 Directory directory = getLuceneDir();
317
318 if (IndexWriter.isLocked(directory)) {
319 IndexWriter.unlock(directory);
320 }
321 }
322 catch (Exception e) {
323 _log.error("Check Lucene directory failed for " + _companyId, e);
324 }
325 }
326
327 private void _commit() throws IOException {
328 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
329 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
330
331 _doCommit();
332 }
333 }
334
335 private void _deleteAll() {
336 try {
337 _indexWriter.deleteAll();
338
339 _doCommit();
340 }
341 catch (Exception e) {
342 if (_log.isWarnEnabled()) {
343 _log.warn("Unable to delete index in directory " + _path);
344 }
345 }
346 }
347
348 private void _deleteDirectory() {
349 if (_log.isDebugEnabled()) {
350 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
351 }
352
353 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
354 PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
355
356 _deleteAll();
357 }
358 else if (PropsValues.LUCENE_STORE_TYPE.equals(
359 _LUCENE_STORE_TYPE_JDBC)) {
360
361 throw new IllegalArgumentException(
362 "Store type JDBC is no longer supported in favor of SOLR");
363 }
364 else {
365 throw new RuntimeException(
366 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
367 }
368 }
369
370 private void _doCommit() throws IOException {
371 if (_indexWriter != null) {
372 _commitLock.lock();
373
374 try {
375 _indexWriter.commit();
376 }
377 finally {
378 _commitLock.unlock();
379
380 _indexSearcherManager.invalidate();
381
382 _invalidate(_companyId);
383 }
384 }
385
386 _batchCount = 0;
387 }
388
389 private Directory _getLuceneDirFile() {
390 Directory directory = null;
391
392 try {
393 if (!OSDetector.isWindows() &&
394 PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
395
396 directory = new MMapDirectory(new File(_path));
397 }
398 else {
399 directory = FSDirectory.open(new File(_path));
400 }
401 }
402 catch (IOException ioe) {
403 if (directory != null) {
404 try {
405 directory.close();
406 }
407 catch (Exception e) {
408 }
409 }
410 }
411
412 return directory;
413 }
414
415 private MergePolicy _getMergePolicy() throws Exception {
416 if (PropsValues.LUCENE_MERGE_POLICY.equals(
417 NoMergePolicy.class.getName())) {
418
419 return NoMergePolicy.NO_COMPOUND_FILES;
420 }
421
422 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
423
424 MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
425 classLoader, PropsValues.LUCENE_MERGE_POLICY);
426
427 if (mergePolicy instanceof LogMergePolicy) {
428 LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
429
430 logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
431 }
432
433 return mergePolicy;
434 }
435
436 private MergeScheduler _getMergeScheduler() throws Exception {
437 if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
438 NoMergeScheduler.class.getName())) {
439
440 return NoMergeScheduler.INSTANCE;
441 }
442
443 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
444
445 return (MergeScheduler)InstanceFactory.newInstance(
446 classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
447 }
448
449 private void _initCommitScheduler() {
450 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
451 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
452
453 return;
454 }
455
456 _scheduledExecutorService =
457 Executors.newSingleThreadScheduledExecutor();
458
459 Runnable runnable = new Runnable() {
460
461 @Override
462 public void run() {
463 try {
464 if (_batchCount > 0) {
465 _doCommit();
466 }
467 }
468 catch (IOException ioe) {
469 _log.error("Could not run scheduled commit", ioe);
470 }
471 }
472
473 };
474
475 _scheduledExecutorService.scheduleWithFixedDelay(
476 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
477 TimeUnit.MILLISECONDS);
478 }
479
480 private void _initIndexWriter() {
481 try {
482 Analyzer analyzer = new LimitTokenCountAnalyzer(
483 LuceneHelperUtil.getAnalyzer(),
484 PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
485
486 IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
487 LuceneHelperUtil.getVersion(), analyzer);
488
489 indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
490 indexWriterConfig.setMergePolicy(_getMergePolicy());
491 indexWriterConfig.setMergeScheduler(_getMergeScheduler());
492 indexWriterConfig.setRAMBufferSizeMB(
493 PropsValues.LUCENE_BUFFER_SIZE);
494
495 _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
496
497 if (!IndexReader.indexExists(getLuceneDir())) {
498
499
500
501 if (_log.isDebugEnabled()) {
502 _log.debug("Creating missing index");
503 }
504
505 _indexWriter.commit();
506 }
507 }
508 catch (Exception e) {
509 _log.error(
510 "Initializing Lucene writer failed for " + _companyId, e);
511 }
512 }
513
514 private void _write(Term term, Document document) throws IOException {
515 try {
516 if (term != null) {
517 _indexWriter.updateDocument(term, document);
518 }
519 else {
520 _indexWriter.addDocument(document);
521 }
522
523 _batchCount++;
524 }
525 finally {
526 _commit();
527 }
528 }
529
530 private static final String _LUCENE_STORE_TYPE_FILE = "file";
531
532 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
533
534 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
535
536 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
537
538 private volatile int _batchCount;
539 private Lock _commitLock = new ReentrantLock();
540 private long _companyId;
541 private Directory _directory;
542 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
543 new DumpIndexDeletionPolicy();
544 private IndexSearcherManager _indexSearcherManager;
545 private IndexWriter _indexWriter;
546 private String _path;
547 private Map<String, Directory> _ramDirectories =
548 new ConcurrentHashMap<String, Directory>();
549 private ScheduledExecutorService _scheduledExecutorService;
550
551 private static class InvalidateProcessCallable
552 implements ProcessCallable<Serializable> {
553
554 public InvalidateProcessCallable(long companyId) {
555 _companyId = companyId;
556 }
557
558 @Override
559 public Serializable call() {
560 IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
561 _companyId);
562
563 indexAccessor.invalidate();
564
565 _invalidate(_companyId);
566
567 return null;
568 }
569
570 private static final long serialVersionUID = 1L;
571
572 private final long _companyId;
573
574 }
575
576 }