001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
020 import com.liferay.portal.kernel.search.SearchEngineUtil;
021 import com.liferay.portal.kernel.util.FileUtil;
022 import com.liferay.portal.kernel.util.InstanceFactory;
023 import com.liferay.portal.kernel.util.StringPool;
024 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
025 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
026 import com.liferay.portal.util.ClassLoaderUtil;
027 import com.liferay.portal.util.PropsValues;
028
029 import java.io.File;
030 import java.io.IOException;
031 import java.io.InputStream;
032 import java.io.OutputStream;
033
034 import java.util.Collection;
035 import java.util.Map;
036 import java.util.concurrent.ConcurrentHashMap;
037 import java.util.concurrent.Executors;
038 import java.util.concurrent.ScheduledExecutorService;
039 import java.util.concurrent.TimeUnit;
040 import java.util.concurrent.locks.Lock;
041 import java.util.concurrent.locks.ReentrantLock;
042
043 import org.apache.lucene.analysis.Analyzer;
044 import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
045 import org.apache.lucene.document.Document;
046 import org.apache.lucene.index.IndexReader;
047 import org.apache.lucene.index.IndexWriter;
048 import org.apache.lucene.index.IndexWriterConfig;
049 import org.apache.lucene.index.LogMergePolicy;
050 import org.apache.lucene.index.MergePolicy;
051 import org.apache.lucene.index.Term;
052 import org.apache.lucene.search.IndexSearcher;
053 import org.apache.lucene.search.MatchAllDocsQuery;
054 import org.apache.lucene.search.ScoreDoc;
055 import org.apache.lucene.search.TopDocs;
056 import org.apache.lucene.store.Directory;
057 import org.apache.lucene.store.FSDirectory;
058 import org.apache.lucene.store.MMapDirectory;
059 import org.apache.lucene.store.RAMDirectory;
060
061
068 public class IndexAccessorImpl implements IndexAccessor {
069
070 public IndexAccessorImpl(long companyId) {
071 _companyId = companyId;
072
073 if (!SPIUtil.isSPI()) {
074 _checkLuceneDir();
075 _initIndexWriter();
076 _initCommitScheduler();
077 }
078 }
079
080 @Override
081 public void addDocument(Document document) throws IOException {
082 if (SearchEngineUtil.isIndexReadOnly()) {
083 return;
084 }
085
086 _write(null, document);
087 }
088
089 @Override
090 public void addDocuments(Collection<Document> documents)
091 throws IOException {
092
093 try {
094 for (Document document : documents) {
095 _indexWriter.addDocument(document);
096 }
097
098 _batchCount++;
099 }
100 finally {
101 _commit();
102 }
103 }
104
105 @Override
106 public void close() {
107 if (SPIUtil.isSPI()) {
108 return;
109 }
110
111 try {
112 _indexWriter.close();
113 }
114 catch (Exception e) {
115 _log.error("Closing Lucene writer failed for " + _companyId, e);
116 }
117 }
118
119 @Override
120 public void delete() {
121 if (SearchEngineUtil.isIndexReadOnly()) {
122 return;
123 }
124
125 _deleteDirectory();
126 }
127
128 @Override
129 public void deleteDocuments(Term term) throws IOException {
130 if (SearchEngineUtil.isIndexReadOnly()) {
131 return;
132 }
133
134 try {
135 _indexWriter.deleteDocuments(term);
136
137 _batchCount++;
138 }
139 finally {
140 _commit();
141 }
142 }
143
144 @Override
145 public void dumpIndex(OutputStream outputStream) throws IOException {
146 _dumpIndexDeletionPolicy.dump(outputStream, _indexWriter, _commitLock);
147 }
148
149 @Override
150 public long getCompanyId() {
151 return _companyId;
152 }
153
154 @Override
155 public long getLastGeneration() {
156 return _dumpIndexDeletionPolicy.getLastGeneration();
157 }
158
159 @Override
160 public Directory getLuceneDir() {
161 if (_log.isDebugEnabled()) {
162 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
163 }
164
165 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
166 return _getLuceneDirFile();
167 }
168 else if (PropsValues.LUCENE_STORE_TYPE.equals(
169 _LUCENE_STORE_TYPE_JDBC)) {
170
171 throw new IllegalArgumentException(
172 "Store type JDBC is no longer supported in favor of SOLR");
173 }
174 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
175 return _getLuceneDirRam();
176 }
177 else {
178 throw new RuntimeException(
179 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
180 }
181 }
182
183 @Override
184 public void loadIndex(InputStream inputStream) throws IOException {
185 File tempFile = FileUtil.createTempFile();
186
187 Directory tempDirectory = FSDirectory.open(tempFile);
188
189 IndexCommitSerializationUtil.deserializeIndex(
190 inputStream, tempDirectory);
191
192 _deleteDirectory();
193
194 IndexReader indexReader = IndexReader.open(tempDirectory, false);
195
196 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
197
198 try {
199 TopDocs topDocs = indexSearcher.search(
200 new MatchAllDocsQuery(), indexReader.numDocs());
201
202 ScoreDoc[] scoreDocs = topDocs.scoreDocs;
203
204 for (ScoreDoc scoreDoc : scoreDocs) {
205 Document document = indexSearcher.doc(scoreDoc.doc);
206
207 addDocument(document);
208 }
209 }
210 catch (IllegalArgumentException iae) {
211 if (_log.isDebugEnabled()) {
212 _log.debug(iae.getMessage());
213 }
214 }
215
216 indexSearcher.close();
217
218 indexReader.flush();
219 indexReader.close();
220
221 tempDirectory.close();
222
223 FileUtil.deltree(tempFile);
224 }
225
226 @Override
227 public void updateDocument(Term term, Document document)
228 throws IOException {
229
230 if (SearchEngineUtil.isIndexReadOnly()) {
231 return;
232 }
233
234 if (_log.isDebugEnabled()) {
235 _log.debug("Indexing " + document);
236 }
237
238 _write(term, document);
239 }
240
241 private void _checkLuceneDir() {
242 if (SearchEngineUtil.isIndexReadOnly()) {
243 return;
244 }
245
246 try {
247 Directory directory = getLuceneDir();
248
249 if (IndexWriter.isLocked(directory)) {
250 IndexWriter.unlock(directory);
251 }
252 }
253 catch (Exception e) {
254 _log.error("Check Lucene directory failed for " + _companyId, e);
255 }
256 }
257
258 private void _commit() throws IOException {
259 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
260 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
261
262 _doCommit();
263 }
264 }
265
266 private void _deleteDirectory() {
267 if (_log.isDebugEnabled()) {
268 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
269 }
270
271 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
272 _deleteFile();
273 }
274 else if (PropsValues.LUCENE_STORE_TYPE.equals(
275 _LUCENE_STORE_TYPE_JDBC)) {
276
277 throw new IllegalArgumentException(
278 "Store type JDBC is no longer supported in favor of SOLR");
279 }
280 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
281 _deleteRam();
282 }
283 else {
284 throw new RuntimeException(
285 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
286 }
287 }
288
289 private void _deleteFile() {
290 String path = _getPath();
291
292 try {
293 _indexWriter.deleteAll();
294
295
296
297 _indexWriter.commit();
298 }
299 catch (Exception e) {
300 if (_log.isWarnEnabled()) {
301 _log.warn("Could not delete index in directory " + path);
302 }
303 }
304 }
305
306 private void _deleteRam() {
307 }
308
309 private void _doCommit() throws IOException {
310 if (_indexWriter != null) {
311 _commitLock.lock();
312
313 try {
314 _indexWriter.commit();
315 }
316 finally {
317 _commitLock.unlock();
318 }
319 }
320
321 _batchCount = 0;
322 }
323
324 private FSDirectory _getDirectory(String path) throws IOException {
325 if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
326 return new MMapDirectory(new File(path));
327 }
328 else {
329 return FSDirectory.open(new File(path));
330 }
331 }
332
333 private Directory _getLuceneDirFile() {
334 Directory directory = null;
335
336 String path = _getPath();
337
338 try {
339 directory = _getDirectory(path);
340 }
341 catch (IOException ioe) {
342 if (directory != null) {
343 try {
344 directory.close();
345 }
346 catch (Exception e) {
347 }
348 }
349 }
350
351 return directory;
352 }
353
354 private Directory _getLuceneDirRam() {
355 String path = _getPath();
356
357 Directory directory = _ramDirectories.get(path);
358
359 if (directory == null) {
360 directory = new RAMDirectory();
361
362 _ramDirectories.put(path, directory);
363 }
364
365 return directory;
366 }
367
368 private MergePolicy _getMergePolicy() throws Exception {
369 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
370
371 MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
372 classLoader, PropsValues.LUCENE_MERGE_POLICY);
373
374 if (mergePolicy instanceof LogMergePolicy) {
375 LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
376
377 logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
378 }
379
380 return mergePolicy;
381 }
382
383 private String _getPath() {
384 return PropsValues.LUCENE_DIR.concat(String.valueOf(_companyId)).concat(
385 StringPool.SLASH);
386 }
387
388 private void _initCommitScheduler() {
389 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
390 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
391
392 return;
393 }
394
395 ScheduledExecutorService scheduledExecutorService =
396 Executors.newSingleThreadScheduledExecutor();
397
398 Runnable runnable = new Runnable() {
399
400 @Override
401 public void run() {
402 try {
403 if (_batchCount > 0) {
404 _doCommit();
405 }
406 }
407 catch (IOException ioe) {
408 _log.error("Could not run scheduled commit", ioe);
409 }
410 }
411
412 };
413
414 scheduledExecutorService.scheduleWithFixedDelay(
415 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
416 TimeUnit.MILLISECONDS);
417 }
418
419 private void _initIndexWriter() {
420 try {
421 Analyzer analyzer = new LimitTokenCountAnalyzer(
422 LuceneHelperUtil.getAnalyzer(),
423 PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
424
425 IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
426 LuceneHelperUtil.getVersion(), analyzer);
427
428 indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
429 indexWriterConfig.setMergePolicy(_getMergePolicy());
430 indexWriterConfig.setRAMBufferSizeMB(
431 PropsValues.LUCENE_BUFFER_SIZE);
432
433 _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
434
435 if (!IndexReader.indexExists(getLuceneDir())) {
436
437
438
439 if (_log.isDebugEnabled()) {
440 _log.debug("Creating missing index");
441 }
442
443 _doCommit();
444 }
445 }
446 catch (Exception e) {
447 _log.error(
448 "Initializing Lucene writer failed for " + _companyId, e);
449 }
450 }
451
452 private void _write(Term term, Document document) throws IOException {
453 try {
454 if (term != null) {
455 _indexWriter.updateDocument(term, document);
456 }
457 else {
458 _indexWriter.addDocument(document);
459 }
460
461 _batchCount++;
462 }
463 finally {
464 _commit();
465 }
466 }
467
468 private static final String _LUCENE_STORE_TYPE_FILE = "file";
469
470 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
471
472 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
473
474 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
475
476 private volatile int _batchCount;
477 private Lock _commitLock = new ReentrantLock();
478 private long _companyId;
479 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
480 new DumpIndexDeletionPolicy();
481 private IndexWriter _indexWriter;
482 private Map<String, Directory> _ramDirectories =
483 new ConcurrentHashMap<String, Directory>();
484
485 }