001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
020 import com.liferay.portal.kernel.search.SearchEngineUtil;
021 import com.liferay.portal.kernel.util.FileUtil;
022 import com.liferay.portal.kernel.util.InstanceFactory;
023 import com.liferay.portal.kernel.util.StringPool;
024 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
025 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
026 import com.liferay.portal.util.ClassLoaderUtil;
027 import com.liferay.portal.util.PropsValues;
028
029 import java.io.File;
030 import java.io.IOException;
031 import java.io.InputStream;
032 import java.io.OutputStream;
033
034 import java.util.Collection;
035 import java.util.Map;
036 import java.util.concurrent.ConcurrentHashMap;
037 import java.util.concurrent.Executors;
038 import java.util.concurrent.ScheduledExecutorService;
039 import java.util.concurrent.TimeUnit;
040 import java.util.concurrent.locks.Lock;
041 import java.util.concurrent.locks.ReentrantLock;
042
043 import org.apache.lucene.analysis.Analyzer;
044 import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
045 import org.apache.lucene.document.Document;
046 import org.apache.lucene.index.IndexReader;
047 import org.apache.lucene.index.IndexWriter;
048 import org.apache.lucene.index.IndexWriterConfig;
049 import org.apache.lucene.index.LogMergePolicy;
050 import org.apache.lucene.index.MergePolicy;
051 import org.apache.lucene.index.MergeScheduler;
052 import org.apache.lucene.index.NoMergePolicy;
053 import org.apache.lucene.index.NoMergeScheduler;
054 import org.apache.lucene.index.Term;
055 import org.apache.lucene.search.IndexSearcher;
056 import org.apache.lucene.search.MatchAllDocsQuery;
057 import org.apache.lucene.search.ScoreDoc;
058 import org.apache.lucene.search.TopDocs;
059 import org.apache.lucene.store.Directory;
060 import org.apache.lucene.store.FSDirectory;
061 import org.apache.lucene.store.MMapDirectory;
062 import org.apache.lucene.store.RAMDirectory;
063
064
071 public class IndexAccessorImpl implements IndexAccessor {
072
073 public IndexAccessorImpl(long companyId) {
074 _companyId = companyId;
075
076 if (!SPIUtil.isSPI()) {
077 _checkLuceneDir();
078 _initIndexWriter();
079 _initCommitScheduler();
080 }
081 }
082
083 @Override
084 public void addDocument(Document document) throws IOException {
085 if (SearchEngineUtil.isIndexReadOnly()) {
086 return;
087 }
088
089 _write(null, document);
090 }
091
092 @Override
093 public void addDocuments(Collection<Document> documents)
094 throws IOException {
095
096 try {
097 for (Document document : documents) {
098 _indexWriter.addDocument(document);
099 }
100
101 _batchCount++;
102 }
103 finally {
104 _commit();
105 }
106 }
107
108 @Override
109 public void close() {
110 if (SPIUtil.isSPI()) {
111 return;
112 }
113
114 try {
115 _indexWriter.close();
116 }
117 catch (Exception e) {
118 _log.error("Closing Lucene writer failed for " + _companyId, e);
119 }
120 }
121
122 @Override
123 public void delete() {
124 if (SearchEngineUtil.isIndexReadOnly()) {
125 return;
126 }
127
128 _deleteDirectory();
129 }
130
131 @Override
132 public void deleteDocuments(Term term) throws IOException {
133 if (SearchEngineUtil.isIndexReadOnly()) {
134 return;
135 }
136
137 try {
138 _indexWriter.deleteDocuments(term);
139
140 _batchCount++;
141 }
142 finally {
143 _commit();
144 }
145 }
146
147 @Override
148 public void dumpIndex(OutputStream outputStream) throws IOException {
149 _dumpIndexDeletionPolicy.dump(outputStream, _indexWriter, _commitLock);
150 }
151
152 @Override
153 public long getCompanyId() {
154 return _companyId;
155 }
156
157 @Override
158 public long getLastGeneration() {
159 return _dumpIndexDeletionPolicy.getLastGeneration();
160 }
161
162 @Override
163 public Directory getLuceneDir() {
164 if (_log.isDebugEnabled()) {
165 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
166 }
167
168 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
169 return _getLuceneDirFile();
170 }
171 else if (PropsValues.LUCENE_STORE_TYPE.equals(
172 _LUCENE_STORE_TYPE_JDBC)) {
173
174 throw new IllegalArgumentException(
175 "Store type JDBC is no longer supported in favor of SOLR");
176 }
177 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
178 return _getLuceneDirRam();
179 }
180 else {
181 throw new RuntimeException(
182 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
183 }
184 }
185
186 @Override
187 public void loadIndex(InputStream inputStream) throws IOException {
188 File tempFile = FileUtil.createTempFile();
189
190 Directory tempDirectory = FSDirectory.open(tempFile);
191
192 IndexCommitSerializationUtil.deserializeIndex(
193 inputStream, tempDirectory);
194
195 _deleteDirectory();
196
197 IndexReader indexReader = IndexReader.open(tempDirectory, false);
198
199 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
200
201 try {
202 TopDocs topDocs = indexSearcher.search(
203 new MatchAllDocsQuery(), indexReader.numDocs());
204
205 ScoreDoc[] scoreDocs = topDocs.scoreDocs;
206
207 for (ScoreDoc scoreDoc : scoreDocs) {
208 Document document = indexSearcher.doc(scoreDoc.doc);
209
210 addDocument(document);
211 }
212 }
213 catch (IllegalArgumentException iae) {
214 if (_log.isDebugEnabled()) {
215 _log.debug(iae.getMessage());
216 }
217 }
218
219 indexSearcher.close();
220
221 indexReader.flush();
222 indexReader.close();
223
224 tempDirectory.close();
225
226 FileUtil.deltree(tempFile);
227 }
228
229 @Override
230 public void updateDocument(Term term, Document document)
231 throws IOException {
232
233 if (SearchEngineUtil.isIndexReadOnly()) {
234 return;
235 }
236
237 if (_log.isDebugEnabled()) {
238 _log.debug("Indexing " + document);
239 }
240
241 _write(term, document);
242 }
243
244 private void _checkLuceneDir() {
245 if (SearchEngineUtil.isIndexReadOnly()) {
246 return;
247 }
248
249 try {
250 Directory directory = getLuceneDir();
251
252 if (IndexWriter.isLocked(directory)) {
253 IndexWriter.unlock(directory);
254 }
255 }
256 catch (Exception e) {
257 _log.error("Check Lucene directory failed for " + _companyId, e);
258 }
259 }
260
261 private void _commit() throws IOException {
262 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
263 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
264
265 _doCommit();
266 }
267 }
268
269 private void _deleteAll() {
270 String path = _getPath();
271
272 try {
273 _indexWriter.deleteAll();
274
275 _indexWriter.commit();
276 }
277 catch (Exception e) {
278 if (_log.isWarnEnabled()) {
279 _log.warn("Unable to delete index in directory " + path);
280 }
281 }
282 }
283
284 private void _deleteDirectory() {
285 if (_log.isDebugEnabled()) {
286 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
287 }
288
289 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
290 PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
291
292 _deleteAll();
293 }
294 else if (PropsValues.LUCENE_STORE_TYPE.equals(
295 _LUCENE_STORE_TYPE_JDBC)) {
296
297 throw new IllegalArgumentException(
298 "Store type JDBC is no longer supported in favor of SOLR");
299 }
300 else {
301 throw new RuntimeException(
302 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
303 }
304 }
305
306 private void _doCommit() throws IOException {
307 if (_indexWriter != null) {
308 _commitLock.lock();
309
310 try {
311 _indexWriter.commit();
312 }
313 finally {
314 _commitLock.unlock();
315 }
316 }
317
318 _batchCount = 0;
319 }
320
321 private FSDirectory _getDirectory(String path) throws IOException {
322 if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
323 return new MMapDirectory(new File(path));
324 }
325 else {
326 return FSDirectory.open(new File(path));
327 }
328 }
329
330 private Directory _getLuceneDirFile() {
331 Directory directory = null;
332
333 String path = _getPath();
334
335 try {
336 directory = _getDirectory(path);
337 }
338 catch (IOException ioe) {
339 if (directory != null) {
340 try {
341 directory.close();
342 }
343 catch (Exception e) {
344 }
345 }
346 }
347
348 return directory;
349 }
350
351 private Directory _getLuceneDirRam() {
352 String path = _getPath();
353
354 Directory directory = _ramDirectories.get(path);
355
356 if (directory == null) {
357 directory = new RAMDirectory();
358
359 _ramDirectories.put(path, directory);
360 }
361
362 return directory;
363 }
364
365 private MergePolicy _getMergePolicy() throws Exception {
366 if (PropsValues.LUCENE_MERGE_POLICY.equals(
367 NoMergePolicy.class.getName())) {
368
369 return NoMergePolicy.NO_COMPOUND_FILES;
370 }
371
372 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
373
374 MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
375 classLoader, PropsValues.LUCENE_MERGE_POLICY);
376
377 if (mergePolicy instanceof LogMergePolicy) {
378 LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
379
380 logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
381 }
382
383 return mergePolicy;
384 }
385
386 private MergeScheduler _getMergeScheduler() throws Exception {
387 if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
388 NoMergeScheduler.class.getName())) {
389
390 return NoMergeScheduler.INSTANCE;
391 }
392
393 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
394
395 return (MergeScheduler)InstanceFactory.newInstance(
396 classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
397 }
398
399 private String _getPath() {
400 return PropsValues.LUCENE_DIR.concat(String.valueOf(_companyId)).concat(
401 StringPool.SLASH);
402 }
403
404 private void _initCommitScheduler() {
405 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
406 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
407
408 return;
409 }
410
411 ScheduledExecutorService scheduledExecutorService =
412 Executors.newSingleThreadScheduledExecutor();
413
414 Runnable runnable = new Runnable() {
415
416 @Override
417 public void run() {
418 try {
419 if (_batchCount > 0) {
420 _doCommit();
421 }
422 }
423 catch (IOException ioe) {
424 _log.error("Could not run scheduled commit", ioe);
425 }
426 }
427
428 };
429
430 scheduledExecutorService.scheduleWithFixedDelay(
431 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
432 TimeUnit.MILLISECONDS);
433 }
434
435 private void _initIndexWriter() {
436 try {
437 Analyzer analyzer = new LimitTokenCountAnalyzer(
438 LuceneHelperUtil.getAnalyzer(),
439 PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
440
441 IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
442 LuceneHelperUtil.getVersion(), analyzer);
443
444 indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
445 indexWriterConfig.setMergePolicy(_getMergePolicy());
446 indexWriterConfig.setMergeScheduler(_getMergeScheduler());
447 indexWriterConfig.setRAMBufferSizeMB(
448 PropsValues.LUCENE_BUFFER_SIZE);
449
450 _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
451
452 if (!IndexReader.indexExists(getLuceneDir())) {
453
454
455
456 if (_log.isDebugEnabled()) {
457 _log.debug("Creating missing index");
458 }
459
460 _doCommit();
461 }
462 }
463 catch (Exception e) {
464 _log.error(
465 "Initializing Lucene writer failed for " + _companyId, e);
466 }
467 }
468
469 private void _write(Term term, Document document) throws IOException {
470 try {
471 if (term != null) {
472 _indexWriter.updateDocument(term, document);
473 }
474 else {
475 _indexWriter.addDocument(document);
476 }
477
478 _batchCount++;
479 }
480 finally {
481 _commit();
482 }
483 }
484
485 private static final String _LUCENE_STORE_TYPE_FILE = "file";
486
487 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
488
489 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
490
491 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
492
493 private volatile int _batchCount;
494 private Lock _commitLock = new ReentrantLock();
495 private long _companyId;
496 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
497 new DumpIndexDeletionPolicy();
498 private IndexWriter _indexWriter;
499 private Map<String, Directory> _ramDirectories =
500 new ConcurrentHashMap<String, Directory>();
501
502 }