001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.dao.jdbc.DataAccess;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.search.SearchEngineUtil;
021 import com.liferay.portal.kernel.util.CharPool;
022 import com.liferay.portal.kernel.util.FileUtil;
023 import com.liferay.portal.kernel.util.InfrastructureUtil;
024 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
025 import com.liferay.portal.kernel.util.PropsKeys;
026 import com.liferay.portal.kernel.util.StringPool;
027 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
028 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
029 import com.liferay.portal.search.lucene.store.jdbc.LiferayJdbcDirectory;
030 import com.liferay.portal.util.PropsUtil;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.io.File;
034 import java.io.IOException;
035 import java.io.InputStream;
036 import java.io.OutputStream;
037
038 import java.sql.Connection;
039 import java.sql.DatabaseMetaData;
040 import java.sql.ResultSet;
041 import java.sql.Statement;
042
043 import java.util.Map;
044 import java.util.concurrent.ConcurrentHashMap;
045 import java.util.concurrent.CountDownLatch;
046 import java.util.concurrent.Executors;
047 import java.util.concurrent.ScheduledExecutorService;
048 import java.util.concurrent.TimeUnit;
049 import java.util.concurrent.locks.Lock;
050 import java.util.concurrent.locks.ReentrantLock;
051
052 import javax.sql.DataSource;
053
054 import org.apache.lucene.document.Document;
055 import org.apache.lucene.index.IndexWriter;
056 import org.apache.lucene.index.Term;
057 import org.apache.lucene.store.Directory;
058 import org.apache.lucene.store.FSDirectory;
059 import org.apache.lucene.store.MMapDirectory;
060 import org.apache.lucene.store.RAMDirectory;
061 import org.apache.lucene.store.jdbc.JdbcDirectory;
062 import org.apache.lucene.store.jdbc.JdbcStoreException;
063 import org.apache.lucene.store.jdbc.dialect.Dialect;
064 import org.apache.lucene.store.jdbc.lock.JdbcLock;
065 import org.apache.lucene.store.jdbc.support.JdbcTemplate;
066
067
073 public class IndexAccessorImpl implements IndexAccessor {
074
075 public IndexAccessorImpl(long companyId) {
076 _companyId = companyId;
077
078 _initDialect();
079 _checkLuceneDir();
080 _initIndexWriter();
081 _initCleanupJdbcScheduler();
082 _initCommitScheduler();
083 }
084
085 public void addDocument(Document document) throws IOException {
086 if (SearchEngineUtil.isIndexReadOnly()) {
087 return;
088 }
089
090 _write(null, document);
091 }
092
093 public void close() {
094 try {
095 _indexWriter.close();
096 }
097 catch (Exception e) {
098 _log.error("Closing Lucene writer failed for " + _companyId, e);
099 }
100 }
101
102 public void delete() {
103 if (SearchEngineUtil.isIndexReadOnly()) {
104 return;
105 }
106
107 close();
108
109 _deleteDirectory();
110
111 _initIndexWriter();
112 }
113
114 public void deleteDocuments(Term term) throws IOException {
115 if (SearchEngineUtil.isIndexReadOnly()) {
116 return;
117 }
118
119 try {
120 _indexWriter.deleteDocuments(term);
121
122 _batchCount++;
123 }
124 finally {
125 _commit();
126 }
127 }
128
129 public void dumpIndex(OutputStream outputStream) throws IOException {
130 _dumpIndexDeletionPolicy.dump(outputStream, _indexWriter, _commitLock);
131 }
132
133 public void enableDumpIndex() {
134 _countDownLatch.countDown();
135 }
136
137 public long getCompanyId() {
138 return _companyId;
139 }
140
141 public long getLastGeneration() {
142 if (_countDownLatch.getCount() > 0) {
143 return DEFAULT_LAST_GENERATION;
144 }
145
146 return _dumpIndexDeletionPolicy.getLastGeneration();
147 }
148
149 public Directory getLuceneDir() {
150 if (_log.isDebugEnabled()) {
151 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
152 }
153
154 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
155 return _getLuceneDirFile();
156 }
157 else if (PropsValues.LUCENE_STORE_TYPE.equals(
158 _LUCENE_STORE_TYPE_JDBC)) {
159
160 return _getLuceneDirJdbc();
161 }
162 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
163 return _getLuceneDirRam();
164 }
165 else {
166 throw new RuntimeException(
167 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
168 }
169 }
170
171 public void loadIndex(InputStream inputStream) throws IOException {
172 File tempFile = FileUtil.createTempFile();
173
174 Directory tempDirectory = FSDirectory.open(tempFile);
175
176 IndexCommitSerializationUtil.deserializeIndex(
177 inputStream, tempDirectory);
178
179 close();
180
181 _deleteDirectory();
182
183 Directory.copy(tempDirectory, getLuceneDir(), true);
184
185 _initIndexWriter();
186
187 tempDirectory.close();
188
189 FileUtil.deltree(tempFile);
190 }
191
192 public void updateDocument(Term term, Document document)
193 throws IOException {
194
195 if (SearchEngineUtil.isIndexReadOnly()) {
196 return;
197 }
198
199 if (_log.isDebugEnabled()) {
200 _log.debug("Indexing " + document);
201 }
202
203 _write(term, document);
204 }
205
206 private void _checkLuceneDir() {
207 if (SearchEngineUtil.isIndexReadOnly()) {
208 return;
209 }
210
211 try {
212 Directory directory = getLuceneDir();
213
214 if (IndexWriter.isLocked(directory)) {
215 IndexWriter.unlock(directory);
216 }
217 }
218 catch (Exception e) {
219 _log.error("Check Lucene directory failed for " + _companyId, e);
220 }
221 }
222
223 private void _cleanUpJdbcDirectories() {
224 for (String tableName : _jdbcDirectories.keySet()) {
225 JdbcDirectory jdbcDirectory = (JdbcDirectory)_jdbcDirectories.get(
226 tableName);
227
228 try {
229 jdbcDirectory.deleteMarkDeleted(60000);
230 }
231 catch (IOException e) {
232 if (_log.isWarnEnabled()) {
233 _log.warn("Could not clean up JDBC directory " + tableName);
234 }
235 }
236 }
237 }
238
239 private void _commit() throws IOException {
240 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
241 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
242
243 _doCommit();
244 }
245 }
246
247 private void _deleteDirectory() {
248 if (_log.isDebugEnabled()) {
249 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
250 }
251
252 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
253 _deleteFile();
254 }
255 else if (PropsValues.LUCENE_STORE_TYPE.equals(
256 _LUCENE_STORE_TYPE_JDBC)) {
257
258 _deleteJdbc();
259 }
260 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
261 _deleteRam();
262 }
263 else {
264 throw new RuntimeException(
265 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
266 }
267 }
268
269 private void _deleteFile() {
270 String path = _getPath();
271
272 try {
273 Directory directory = _getDirectory(path);
274
275 directory.close();
276 }
277 catch (Exception e) {
278 if (_log.isWarnEnabled()) {
279 _log.warn("Could not close directory " + path);
280 }
281 }
282
283 FileUtil.deltree(path);
284 }
285
286 private void _deleteJdbc() {
287 String tableName = _getTableName();
288
289 try {
290 Directory directory = _jdbcDirectories.remove(tableName);
291
292 if (directory != null) {
293 directory.close();
294 }
295 }
296 catch (Exception e) {
297 if (_log.isWarnEnabled()) {
298 _log.warn("Could not close directory " + tableName);
299 }
300 }
301
302 Connection con = null;
303 Statement s = null;
304
305 try {
306 con = DataAccess.getConnection();
307
308 s = con.createStatement();
309
310 s.executeUpdate("DELETE FROM " + tableName);
311 }
312 catch (Exception e) {
313 if (_log.isWarnEnabled()) {
314 _log.warn("Could not truncate " + tableName);
315 }
316 }
317 finally {
318 DataAccess.cleanUp(con, s);
319 }
320 }
321
322 private void _deleteRam() {
323 }
324
325 private void _doCommit() throws IOException {
326 if (_indexWriter != null) {
327 _commitLock.lock();
328
329 try {
330 _indexWriter.commit();
331 }
332 finally {
333 _commitLock.unlock();
334 }
335 }
336
337 _batchCount = 0;
338 }
339
340 private FSDirectory _getDirectory(String path) throws IOException {
341 if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
342 return new MMapDirectory(new File(path));
343 }
344 else {
345 return FSDirectory.open(new File(path));
346 }
347 }
348
349 private Directory _getLuceneDirFile() {
350 Directory directory = null;
351
352 String path = _getPath();
353
354 try {
355 directory = _getDirectory(path);
356 }
357 catch (IOException ioe1) {
358 if (directory != null) {
359 try {
360 directory.close();
361 }
362 catch (Exception e) {
363 }
364 }
365 }
366
367 return directory;
368 }
369
370 private Directory _getLuceneDirJdbc() {
371 JdbcDirectory jdbcDirectory = null;
372
373 Thread currentThread = Thread.currentThread();
374
375 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
376
377 try {
378 currentThread.setContextClassLoader(
379 PortalClassLoaderUtil.getClassLoader());
380
381 String tableName = _getTableName();
382
383 jdbcDirectory = (JdbcDirectory)_jdbcDirectories.get(tableName);
384
385 if (jdbcDirectory != null) {
386 return jdbcDirectory;
387 }
388
389 try {
390 DataSource dataSource = InfrastructureUtil.getDataSource();
391
392 jdbcDirectory = new LiferayJdbcDirectory(
393 dataSource, _dialect, tableName);
394
395 _jdbcDirectories.put(tableName, jdbcDirectory);
396
397 if (!jdbcDirectory.tableExists()) {
398 jdbcDirectory.create();
399 }
400 }
401 catch (IOException ioe) {
402 throw new RuntimeException(ioe);
403 }
404 catch (UnsupportedOperationException uoe) {
405 if (_log.isWarnEnabled()) {
406 _log.warn(
407 "Database doesn't support the ability to check " +
408 "whether a table exists");
409 }
410
411 _manuallyCreateJdbcDirectory(jdbcDirectory, tableName);
412 }
413 }
414 finally {
415 currentThread.setContextClassLoader(contextClassLoader);
416 }
417
418 return jdbcDirectory;
419 }
420
421 private Directory _getLuceneDirRam() {
422 String path = _getPath();
423
424 Directory directory = _ramDirectories.get(path);
425
426 if (directory == null) {
427 directory = new RAMDirectory();
428
429 _ramDirectories.put(path, directory);
430 }
431
432 return directory;
433 }
434
435 private String _getPath() {
436 return PropsValues.LUCENE_DIR.concat(String.valueOf(_companyId)).concat(
437 StringPool.SLASH);
438 }
439
440 private String _getTableName() {
441 return _LUCENE_TABLE_PREFIX + _companyId;
442 }
443
444 private void _initCommitScheduler() {
445 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
446 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
447
448 return;
449 }
450
451 ScheduledExecutorService scheduledExecutorService =
452 Executors.newSingleThreadScheduledExecutor();
453
454 Runnable runnable = new Runnable() {
455
456 public void run() {
457 try {
458 if (_batchCount > 0) {
459 _doCommit();
460 }
461 }
462 catch (IOException ioe) {
463 _log.error("Could not run scheduled commit", ioe);
464 }
465 }
466
467 };
468
469 scheduledExecutorService.scheduleWithFixedDelay(
470 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
471 TimeUnit.MILLISECONDS);
472 }
473
474 private void _initCleanupJdbcScheduler() {
475 if (!PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_JDBC) ||
476 !PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_ENABLED) {
477
478 return;
479 }
480
481 ScheduledExecutorService scheduledExecutorService =
482 Executors.newSingleThreadScheduledExecutor();
483
484 Runnable runnable = new Runnable() {
485
486 public void run() {
487 _cleanUpJdbcDirectories();
488 }
489
490 };
491
492 scheduledExecutorService.scheduleWithFixedDelay(
493 runnable, 0,
494 PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_INTERVAL * 60L,
495 TimeUnit.SECONDS);
496 }
497
498 private void _initDialect() {
499 if (!PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_JDBC)) {
500 return;
501 }
502
503 Connection con = null;
504
505 try {
506 con = DataAccess.getConnection();
507
508 String url = con.getMetaData().getURL();
509
510 int x = url.indexOf(CharPool.COLON);
511 int y = url.indexOf(CharPool.COLON, x + 1);
512
513 String urlPrefix = url.substring(x + 1, y);
514
515 String dialectClass = PropsUtil.get(
516 PropsKeys.LUCENE_STORE_JDBC_DIALECT + urlPrefix);
517
518 if (dialectClass != null) {
519 if (_log.isDebugEnabled()) {
520 _log.debug("JDBC class implementation " + dialectClass);
521 }
522 }
523 else {
524 if (_log.isDebugEnabled()) {
525 _log.debug("JDBC class implementation is null");
526 }
527 }
528
529 if (dialectClass != null) {
530 _dialect = (Dialect)Class.forName(dialectClass).newInstance();
531 }
532 }
533 catch (Exception e) {
534 _log.error(e);
535 }
536 finally{
537 DataAccess.cleanUp(con);
538 }
539
540 if (_dialect == null) {
541 _log.error("No JDBC dialect found");
542 }
543 }
544
545 private void _initIndexWriter() {
546 try {
547 _indexWriter = new IndexWriter(
548 getLuceneDir(), LuceneHelperUtil.getAnalyzer(),
549 _dumpIndexDeletionPolicy, IndexWriter.MaxFieldLength.LIMITED);
550
551 _indexWriter.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
552 _indexWriter.setRAMBufferSizeMB(PropsValues.LUCENE_BUFFER_SIZE);
553 }
554 catch (Exception e) {
555 _log.error(
556 "Initializing Lucene writer failed for " + _companyId, e);
557 }
558 }
559
560 private void _manuallyCreateJdbcDirectory(
561 JdbcDirectory jdbcDirectory, String tableName) {
562
563
564
565 Connection con = null;
566 ResultSet rs = null;
567
568 try {
569 con = DataAccess.getConnection();
570
571
572
573 DatabaseMetaData databaseMetaData = con.getMetaData();
574
575 rs = databaseMetaData.getTables(null, null, tableName, null);
576
577 if (!rs.next()) {
578 JdbcTemplate jdbcTemplate = jdbcDirectory.getJdbcTemplate();
579
580 jdbcTemplate.executeUpdate(
581 jdbcDirectory.getTable().sqlCreate());
582
583 Class<?> lockClass = jdbcDirectory.getSettings().getLockClass();
584
585 JdbcLock jdbcLock = null;
586
587 try {
588 jdbcLock = (JdbcLock)lockClass.newInstance();
589 }
590 catch (Exception e) {
591 throw new JdbcStoreException(
592 "Could not create lock class " + lockClass);
593 }
594
595 jdbcLock.initializeDatabase(jdbcDirectory);
596 }
597 }
598 catch (Exception e) {
599 if (_log.isWarnEnabled()) {
600 _log.warn("Could not create " + tableName);
601 }
602 }
603 finally {
604 DataAccess.cleanUp(con, null, rs);
605 }
606 }
607
608 private void _write(Term term, Document document) throws IOException {
609 try {
610 if (term != null) {
611 _indexWriter.updateDocument(term, document);
612 }
613 else {
614 _indexWriter.addDocument(document);
615 }
616
617 _optimizeCount++;
618
619 if ((PropsValues.LUCENE_OPTIMIZE_INTERVAL == 0) ||
620 (_optimizeCount >= PropsValues.LUCENE_OPTIMIZE_INTERVAL)) {
621
622 _indexWriter.optimize();
623
624 _optimizeCount = 0;
625 }
626
627 _batchCount++;
628 }
629 finally {
630 _commit();
631 }
632 }
633
634 private static final String _LUCENE_STORE_TYPE_FILE = "file";
635
636 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
637
638 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
639
640 private static final String _LUCENE_TABLE_PREFIX = "LUCENE_";
641
642 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
643
644 private volatile int _batchCount;
645 private Lock _commitLock = new ReentrantLock();
646 private long _companyId;
647 private CountDownLatch _countDownLatch = new CountDownLatch(1);
648 private Dialect _dialect;
649 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
650 new DumpIndexDeletionPolicy();
651 private IndexWriter _indexWriter;
652 private Map<String, Directory> _jdbcDirectories =
653 new ConcurrentHashMap<String, Directory>();
654 private int _optimizeCount;
655 private Map<String, Directory> _ramDirectories =
656 new ConcurrentHashMap<String, Directory>();
657
658 }