001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.dao.jdbc.DataAccess;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.search.SearchEngineUtil;
021 import com.liferay.portal.kernel.util.CharPool;
022 import com.liferay.portal.kernel.util.FileUtil;
023 import com.liferay.portal.kernel.util.InfrastructureUtil;
024 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
025 import com.liferay.portal.kernel.util.PropsKeys;
026 import com.liferay.portal.kernel.util.StringPool;
027 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
028 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
029 import com.liferay.portal.search.lucene.store.jdbc.LiferayJdbcDirectory;
030 import com.liferay.portal.util.PropsUtil;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.io.File;
034 import java.io.IOException;
035 import java.io.InputStream;
036 import java.io.OutputStream;
037
038 import java.sql.Connection;
039 import java.sql.DatabaseMetaData;
040 import java.sql.ResultSet;
041 import java.sql.Statement;
042
043 import java.util.Map;
044 import java.util.concurrent.ConcurrentHashMap;
045 import java.util.concurrent.CountDownLatch;
046 import java.util.concurrent.Executors;
047 import java.util.concurrent.ScheduledExecutorService;
048 import java.util.concurrent.TimeUnit;
049 import java.util.concurrent.locks.Lock;
050 import java.util.concurrent.locks.ReentrantLock;
051
052 import javax.sql.DataSource;
053
054 import org.apache.lucene.document.Document;
055 import org.apache.lucene.index.IndexWriter;
056 import org.apache.lucene.index.Term;
057 import org.apache.lucene.store.Directory;
058 import org.apache.lucene.store.FSDirectory;
059 import org.apache.lucene.store.MMapDirectory;
060 import org.apache.lucene.store.RAMDirectory;
061 import org.apache.lucene.store.jdbc.JdbcDirectory;
062 import org.apache.lucene.store.jdbc.JdbcStoreException;
063 import org.apache.lucene.store.jdbc.dialect.Dialect;
064 import org.apache.lucene.store.jdbc.lock.JdbcLock;
065 import org.apache.lucene.store.jdbc.support.JdbcTemplate;
066
067
073 public class IndexAccessorImpl implements IndexAccessor {
074
075 public IndexAccessorImpl(long companyId) {
076 _companyId = companyId;
077
078 _initDialect();
079 _checkLuceneDir();
080 _initIndexWriter();
081 _initCleanupJdbcScheduler();
082 _initCommitScheduler();
083 }
084
085 public void addDocument(Document document) throws IOException {
086 if (SearchEngineUtil.isIndexReadOnly()) {
087 return;
088 }
089
090 _write(null, document);
091 }
092
093 public void close() {
094 try {
095 _indexWriter.close();
096 }
097 catch (Exception e) {
098 _log.error(
099 "Closing Lucene writer failed for " + _companyId, e);
100 }
101 }
102
103 public void delete() {
104 if (SearchEngineUtil.isIndexReadOnly()) {
105 return;
106 }
107
108 close();
109
110 _deleteDirectory();
111
112 _initIndexWriter();
113 }
114
115 public void deleteDocuments(Term term) throws IOException {
116 if (SearchEngineUtil.isIndexReadOnly()) {
117 return;
118 }
119
120 try {
121 _indexWriter.deleteDocuments(term);
122
123 _batchCount++;
124 }
125 finally {
126 _commit();
127 }
128 }
129
130 public void dumpIndex(OutputStream outputStream) throws IOException {
131 _dumpIndexDeletionPolicy.dump(outputStream, _indexWriter, _commitLock);
132 }
133
134 public void enableDumpIndex() {
135 _countDownLatch.countDown();
136 }
137
138 public long getCompanyId() {
139 return _companyId;
140 }
141
142 public long getLastGeneration() {
143 if (_countDownLatch.getCount() > 0) {
144 return DEFAULT_LAST_GENERATION;
145 }
146
147 return _dumpIndexDeletionPolicy.getLastGeneration();
148 }
149
150 public Directory getLuceneDir() {
151 if (_log.isDebugEnabled()) {
152 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
153 }
154
155 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
156 return _getLuceneDirFile();
157 }
158 else if (PropsValues.LUCENE_STORE_TYPE.equals(
159 _LUCENE_STORE_TYPE_JDBC)) {
160
161 return _getLuceneDirJdbc();
162 }
163 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
164 return _getLuceneDirRam();
165 }
166 else {
167 throw new RuntimeException(
168 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
169 }
170 }
171
172 public void loadIndex(InputStream inputStream) throws IOException {
173 File tempFile = FileUtil.createTempFile();
174
175 Directory tempDirectory = FSDirectory.open(tempFile);
176
177 IndexCommitSerializationUtil.deserializeIndex(
178 inputStream, tempDirectory);
179
180 close();
181
182 _deleteDirectory();
183
184 Directory.copy(tempDirectory, getLuceneDir(), true);
185
186 _initIndexWriter();
187
188 tempDirectory.close();
189
190 FileUtil.deltree(tempFile);
191 }
192
193 public void updateDocument(Term term, Document document)
194 throws IOException {
195
196 if (SearchEngineUtil.isIndexReadOnly()) {
197 return;
198 }
199
200 if (_log.isDebugEnabled()) {
201 _log.debug("Indexing " + document);
202 }
203
204 _write(term, document);
205 }
206
207 private void _checkLuceneDir() {
208 if (SearchEngineUtil.isIndexReadOnly()) {
209 return;
210 }
211
212 try {
213 Directory directory = getLuceneDir();
214
215 if (IndexWriter.isLocked(directory)) {
216 IndexWriter.unlock(directory);
217 }
218 }
219 catch (Exception e) {
220 _log.error("Check Lucene directory failed for " + _companyId, e);
221 }
222 }
223
224 private void _cleanUpJdbcDirectories() {
225 for (String tableName : _jdbcDirectories.keySet()) {
226 JdbcDirectory jdbcDirectory = (JdbcDirectory)_jdbcDirectories.get(
227 tableName);
228
229 try {
230 jdbcDirectory.deleteMarkDeleted(60000);
231 }
232 catch (IOException e) {
233 if (_log.isWarnEnabled()) {
234 _log.warn("Could not clean up JDBC directory " + tableName);
235 }
236 }
237 }
238 }
239
240 private void _commit() throws IOException {
241 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
242 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
243
244 _doCommit();
245 }
246 }
247
248 private void _deleteDirectory() {
249 if (_log.isDebugEnabled()) {
250 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
251 }
252
253 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
254 _deleteFile();
255 }
256 else if (PropsValues.LUCENE_STORE_TYPE.equals(
257 _LUCENE_STORE_TYPE_JDBC)) {
258
259 _deleteJdbc();
260 }
261 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
262 _deleteRam();
263 }
264 else {
265 throw new RuntimeException(
266 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
267 }
268 }
269
270 private void _deleteFile() {
271 String path = _getPath();
272
273 try {
274 Directory directory = _getDirectory(path);
275
276 directory.close();
277 }
278 catch (Exception e) {
279 if (_log.isWarnEnabled()) {
280 _log.warn("Could not close directory " + path);
281 }
282 }
283
284 FileUtil.deltree(path);
285 }
286
287 private void _deleteJdbc() {
288 String tableName = _getTableName();
289
290 try {
291 Directory directory = _jdbcDirectories.remove(tableName);
292
293 if (directory != null) {
294 directory.close();
295 }
296 }
297 catch (Exception e) {
298 if (_log.isWarnEnabled()) {
299 _log.warn("Could not close directory " + tableName);
300 }
301 }
302
303 Connection con = null;
304 Statement s = null;
305
306 try {
307 con = DataAccess.getConnection();
308
309 s = con.createStatement();
310
311 s.executeUpdate("DELETE FROM " + tableName);
312 }
313 catch (Exception e) {
314 if (_log.isWarnEnabled()) {
315 _log.warn("Could not truncate " + tableName);
316 }
317 }
318 finally {
319 DataAccess.cleanUp(con, s);
320 }
321 }
322
323 private void _deleteRam() {
324 }
325
326 private void _doCommit() throws IOException {
327 if (_indexWriter != null) {
328 _commitLock.lock();
329
330 try {
331 _indexWriter.commit();
332 }
333 finally {
334 _commitLock.unlock();
335 }
336 }
337
338 _batchCount = 0;
339 }
340
341 private FSDirectory _getDirectory(String path) throws IOException {
342 if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
343 return new MMapDirectory(new File(path));
344 }
345 else {
346 return FSDirectory.open(new File(path));
347 }
348 }
349
350 private Directory _getLuceneDirFile() {
351 Directory directory = null;
352
353 String path = _getPath();
354
355 try {
356 directory = _getDirectory(path);
357 }
358 catch (IOException ioe1) {
359 if (directory != null) {
360 try {
361 directory.close();
362 }
363 catch (Exception e) {
364 }
365 }
366 }
367
368 return directory;
369 }
370
371 private Directory _getLuceneDirJdbc() {
372 JdbcDirectory jdbcDirectory = null;
373
374 Thread currentThread = Thread.currentThread();
375
376 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
377
378 try {
379 currentThread.setContextClassLoader(
380 PortalClassLoaderUtil.getClassLoader());
381
382 String tableName = _getTableName();
383
384 jdbcDirectory = (JdbcDirectory)_jdbcDirectories.get(tableName);
385
386 if (jdbcDirectory != null) {
387 return jdbcDirectory;
388 }
389
390 try {
391 DataSource dataSource = InfrastructureUtil.getDataSource();
392
393 jdbcDirectory = new LiferayJdbcDirectory(
394 dataSource, _dialect, tableName);
395
396 _jdbcDirectories.put(tableName, jdbcDirectory);
397
398 if (!jdbcDirectory.tableExists()) {
399 jdbcDirectory.create();
400 }
401 }
402 catch (IOException ioe) {
403 throw new RuntimeException(ioe);
404 }
405 catch (UnsupportedOperationException uoe) {
406 if (_log.isWarnEnabled()) {
407 _log.warn(
408 "Database doesn't support the ability to check " +
409 "whether a table exists");
410 }
411
412 _manuallyCreateJdbcDirectory(jdbcDirectory, tableName);
413 }
414 }
415 finally {
416 currentThread.setContextClassLoader(contextClassLoader);
417 }
418
419 return jdbcDirectory;
420 }
421
422 private Directory _getLuceneDirRam() {
423 String path = _getPath();
424
425 Directory directory = _ramDirectories.get(path);
426
427 if (directory == null) {
428 directory = new RAMDirectory();
429
430 _ramDirectories.put(path, directory);
431 }
432
433 return directory;
434 }
435
436 private String _getPath() {
437 return PropsValues.LUCENE_DIR.concat(String.valueOf(_companyId)).concat(
438 StringPool.SLASH);
439 }
440
441 private String _getTableName() {
442 return _LUCENE_TABLE_PREFIX + _companyId;
443 }
444
445 private void _initCommitScheduler() {
446 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
447 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
448
449 return;
450 }
451
452 ScheduledExecutorService scheduledExecutorService =
453 Executors.newSingleThreadScheduledExecutor();
454
455 Runnable runnable = new Runnable() {
456
457 public void run() {
458 try {
459 if (_batchCount > 0) {
460 _doCommit();
461 }
462 }
463 catch (IOException ioe) {
464 _log.error("Could not run scheduled commit", ioe);
465 }
466 }
467
468 };
469
470 scheduledExecutorService.scheduleWithFixedDelay(
471 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
472 TimeUnit.MILLISECONDS);
473 }
474
475 private void _initCleanupJdbcScheduler() {
476 if (!PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_JDBC) ||
477 !PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_ENABLED) {
478
479 return;
480 }
481
482 ScheduledExecutorService scheduledExecutorService =
483 Executors.newSingleThreadScheduledExecutor();
484
485 Runnable runnable = new Runnable() {
486
487 public void run() {
488 _cleanUpJdbcDirectories();
489 }
490
491 };
492
493 scheduledExecutorService.scheduleWithFixedDelay(
494 runnable, 0,
495 PropsValues.LUCENE_STORE_JDBC_AUTO_CLEAN_UP_INTERVAL * 60L,
496 TimeUnit.SECONDS);
497 }
498
499 private void _initDialect() {
500 if (!PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_JDBC)) {
501 return;
502 }
503
504 Connection con = null;
505
506 try {
507 con = DataAccess.getConnection();
508
509 String url = con.getMetaData().getURL();
510
511 int x = url.indexOf(CharPool.COLON);
512 int y = url.indexOf(CharPool.COLON, x + 1);
513
514 String urlPrefix = url.substring(x + 1, y);
515
516 String dialectClass = PropsUtil.get(
517 PropsKeys.LUCENE_STORE_JDBC_DIALECT + urlPrefix);
518
519 if (dialectClass != null) {
520 if (_log.isDebugEnabled()) {
521 _log.debug("JDBC class implementation " + dialectClass);
522 }
523 }
524 else {
525 if (_log.isDebugEnabled()) {
526 _log.debug("JDBC class implementation is null");
527 }
528 }
529
530 if (dialectClass != null) {
531 _dialect = (Dialect)Class.forName(dialectClass).newInstance();
532 }
533 }
534 catch (Exception e) {
535 _log.error(e);
536 }
537 finally{
538 DataAccess.cleanUp(con);
539 }
540
541 if (_dialect == null) {
542 _log.error("No JDBC dialect found");
543 }
544 }
545
546 private void _initIndexWriter() {
547 try {
548 _indexWriter = new IndexWriter(
549 getLuceneDir(), LuceneHelperUtil.getAnalyzer(),
550 _dumpIndexDeletionPolicy, IndexWriter.MaxFieldLength.LIMITED);
551
552 _indexWriter.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
553 _indexWriter.setRAMBufferSizeMB(PropsValues.LUCENE_BUFFER_SIZE);
554 }
555 catch (Exception e) {
556 _log.error(
557 "Initializing Lucene writer failed for " + _companyId, e);
558 }
559 }
560
561 private void _manuallyCreateJdbcDirectory(
562 JdbcDirectory jdbcDirectory, String tableName) {
563
564
565
566 Connection con = null;
567 ResultSet rs = null;
568
569 try {
570 con = DataAccess.getConnection();
571
572
573
574 DatabaseMetaData databaseMetaData = con.getMetaData();
575
576 rs = databaseMetaData.getTables(null, null, tableName, null);
577
578 if (!rs.next()) {
579 JdbcTemplate jdbcTemplate = jdbcDirectory.getJdbcTemplate();
580
581 jdbcTemplate.executeUpdate(
582 jdbcDirectory.getTable().sqlCreate());
583
584 Class<?> lockClass = jdbcDirectory.getSettings().getLockClass();
585
586 JdbcLock jdbcLock = null;
587
588 try {
589 jdbcLock = (JdbcLock)lockClass.newInstance();
590 }
591 catch (Exception e) {
592 throw new JdbcStoreException(
593 "Could not create lock class " + lockClass);
594 }
595
596 jdbcLock.initializeDatabase(jdbcDirectory);
597 }
598 }
599 catch (Exception e) {
600 if (_log.isWarnEnabled()) {
601 _log.warn("Could not create " + tableName);
602 }
603 }
604 finally {
605 DataAccess.cleanUp(con, null, rs);
606 }
607 }
608
609 private void _write(Term term, Document document) throws IOException {
610 try {
611 if (term != null) {
612 _indexWriter.updateDocument(term, document);
613 }
614 else {
615 _indexWriter.addDocument(document);
616 }
617
618 _optimizeCount++;
619
620 if ((PropsValues.LUCENE_OPTIMIZE_INTERVAL == 0) ||
621 (_optimizeCount >= PropsValues.LUCENE_OPTIMIZE_INTERVAL)) {
622
623 _indexWriter.optimize();
624
625 _optimizeCount = 0;
626 }
627
628 _batchCount++;
629 }
630 finally {
631 _commit();
632 }
633 }
634
635 private static final String _LUCENE_STORE_TYPE_FILE = "file";
636
637 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
638
639 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
640
641 private static final String _LUCENE_TABLE_PREFIX = "LUCENE_";
642
643 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
644
645 private volatile int _batchCount;
646 private Lock _commitLock = new ReentrantLock();
647 private long _companyId;
648 private CountDownLatch _countDownLatch = new CountDownLatch(1);
649 private Dialect _dialect;
650 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
651 new DumpIndexDeletionPolicy();
652 private IndexWriter _indexWriter;
653 private Map<String, Directory> _jdbcDirectories =
654 new ConcurrentHashMap<String, Directory>();
655 private int _optimizeCount;
656 private Map<String, Directory> _ramDirectories =
657 new ConcurrentHashMap<String, Directory>();
658
659 }