001
014
015 package com.liferay.portal.scheduler.quartz;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019 import com.liferay.portal.kernel.dao.db.DB;
020 import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021 import com.liferay.portal.kernel.json.JSONFactoryUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Destination;
025 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026 import com.liferay.portal.kernel.messaging.Message;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageBusUtil;
029 import com.liferay.portal.kernel.messaging.MessageListener;
030 import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031 import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032 import com.liferay.portal.kernel.scheduler.JobState;
033 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036 import com.liferay.portal.kernel.scheduler.SchedulerException;
037 import com.liferay.portal.kernel.scheduler.StorageType;
038 import com.liferay.portal.kernel.scheduler.TriggerState;
039 import com.liferay.portal.kernel.scheduler.TriggerType;
040 import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
041 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
042 import com.liferay.portal.kernel.util.CharPool;
043 import com.liferay.portal.kernel.util.ClassLoaderPool;
044 import com.liferay.portal.kernel.util.ProxyUtil;
045 import com.liferay.portal.kernel.util.Validator;
046 import com.liferay.portal.scheduler.job.MessageSenderJob;
047 import com.liferay.portal.service.QuartzLocalService;
048 import com.liferay.portal.util.ClassLoaderUtil;
049 import com.liferay.portal.util.PropsUtil;
050 import com.liferay.portal.util.PropsValues;
051
052 import java.util.ArrayList;
053 import java.util.Collections;
054 import java.util.Date;
055 import java.util.List;
056 import java.util.Map;
057 import java.util.Properties;
058 import java.util.Set;
059
060 import org.quartz.CronScheduleBuilder;
061 import org.quartz.CronTrigger;
062 import org.quartz.JobBuilder;
063 import org.quartz.JobDataMap;
064 import org.quartz.JobDetail;
065 import org.quartz.JobKey;
066 import org.quartz.ObjectAlreadyExistsException;
067 import org.quartz.Scheduler;
068 import org.quartz.SimpleScheduleBuilder;
069 import org.quartz.SimpleTrigger;
070 import org.quartz.Trigger;
071 import org.quartz.TriggerBuilder;
072 import org.quartz.TriggerKey;
073 import org.quartz.impl.StdSchedulerFactory;
074 import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
075 import org.quartz.impl.matchers.GroupMatcher;
076
077
084 public class QuartzSchedulerEngine implements SchedulerEngine {
085
086 public void afterPropertiesSet() {
087 if (!PropsValues.SCHEDULER_ENABLED) {
088 return;
089 }
090
091 try {
092 quartzLocalService.checkQuartzTables();
093
094 _persistedScheduler = initializeScheduler(
095 "persisted.scheduler.", true);
096
097 _memoryScheduler = initializeScheduler("memory.scheduler.", false);
098 }
099 catch (Exception e) {
100 _log.error("Unable to initialize engine", e);
101 }
102 }
103
104 @Override
105 public void delete(String groupName, StorageType storageType)
106 throws SchedulerException {
107
108 if (!PropsValues.SCHEDULER_ENABLED) {
109 return;
110 }
111
112 try {
113 Scheduler scheduler = getScheduler(storageType);
114
115 groupName = fixMaxLength(
116 groupName, GROUP_NAME_MAX_LENGTH, storageType);
117
118 Set<JobKey> jobKeys = scheduler.getJobKeys(
119 GroupMatcher.jobGroupEquals(groupName));
120
121 for (JobKey jobKey : jobKeys) {
122 unregisterMessageListener(scheduler, jobKey);
123
124 scheduler.deleteJob(jobKey);
125 }
126 }
127 catch (Exception e) {
128 throw new SchedulerException(
129 "Unable to delete jobs in group " + groupName, e);
130 }
131 }
132
133 @Override
134 public void delete(
135 String jobName, String groupName, StorageType storageType)
136 throws SchedulerException {
137
138 if (!PropsValues.SCHEDULER_ENABLED) {
139 return;
140 }
141
142 try {
143 Scheduler scheduler = getScheduler(storageType);
144
145 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
146 groupName = fixMaxLength(
147 groupName, GROUP_NAME_MAX_LENGTH, storageType);
148
149 JobKey jobKey = new JobKey(jobName, groupName);
150
151 unregisterMessageListener(scheduler, jobKey);
152
153 scheduler.deleteJob(jobKey);
154 }
155 catch (Exception e) {
156 throw new SchedulerException(
157 "Unable to delete job {jobName=" + jobName + ", groupName=" +
158 groupName + "}",
159 e);
160 }
161 }
162
163 public void destroy() {
164 try {
165 shutdown();
166 }
167 catch (SchedulerException se) {
168 if (_log.isWarnEnabled()) {
169 _log.warn("Unable to shutdown", se);
170 }
171 }
172 }
173
174 @Override
175 public SchedulerResponse getScheduledJob(
176 String jobName, String groupName, StorageType storageType)
177 throws SchedulerException {
178
179 if (!PropsValues.SCHEDULER_ENABLED) {
180 return null;
181 }
182
183 try {
184 Scheduler scheduler = getScheduler(storageType);
185
186 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
187 groupName = fixMaxLength(
188 groupName, GROUP_NAME_MAX_LENGTH, storageType);
189
190 JobKey jobKey = new JobKey(jobName, groupName);
191
192 return getScheduledJob(scheduler, jobKey);
193 }
194 catch (Exception e) {
195 throw new SchedulerException(
196 "Unable to get job {jobName=" + jobName + ", groupName=" +
197 groupName + "}",
198 e);
199 }
200 }
201
202 @Override
203 public List<SchedulerResponse> getScheduledJobs()
204 throws SchedulerException {
205
206 if (!PropsValues.SCHEDULER_ENABLED) {
207 return Collections.emptyList();
208 }
209
210 try {
211 List<String> groupNames = _persistedScheduler.getJobGroupNames();
212
213 List<SchedulerResponse> schedulerResponses =
214 new ArrayList<SchedulerResponse>();
215
216 for (String groupName : groupNames) {
217 schedulerResponses.addAll(
218 getScheduledJobs(_persistedScheduler, groupName, null));
219 }
220
221 groupNames = _memoryScheduler.getJobGroupNames();
222
223 for (String groupName : groupNames) {
224 schedulerResponses.addAll(
225 getScheduledJobs(_memoryScheduler, groupName, null));
226 }
227
228 return schedulerResponses;
229 }
230 catch (Exception e) {
231 throw new SchedulerException("Unable to get jobs", e);
232 }
233 }
234
235 @Override
236 public List<SchedulerResponse> getScheduledJobs(StorageType storageType)
237 throws SchedulerException {
238
239 if (!PropsValues.SCHEDULER_ENABLED) {
240 return Collections.emptyList();
241 }
242
243 try {
244 Scheduler scheduler = getScheduler(storageType);
245
246 List<String> groupNames = scheduler.getJobGroupNames();
247
248 List<SchedulerResponse> schedulerResponses =
249 new ArrayList<SchedulerResponse>();
250
251 for (String groupName : groupNames) {
252 schedulerResponses.addAll(
253 getScheduledJobs(scheduler, groupName, storageType));
254 }
255
256 return schedulerResponses;
257 }
258 catch (Exception e) {
259 throw new SchedulerException(
260 "Unable to get jobs with type " + storageType, e);
261 }
262 }
263
264 @Override
265 public List<SchedulerResponse> getScheduledJobs(
266 String groupName, StorageType storageType)
267 throws SchedulerException {
268
269 if (!PropsValues.SCHEDULER_ENABLED) {
270 return Collections.emptyList();
271 }
272
273 try {
274 Scheduler scheduler = getScheduler(storageType);
275
276 return getScheduledJobs(scheduler, groupName, storageType);
277 }
278 catch (Exception e) {
279 throw new SchedulerException(
280 "Unable to get jobs in group " + groupName, e);
281 }
282 }
283
284 @Override
285 public void pause(String groupName, StorageType storageType)
286 throws SchedulerException {
287
288 if (!PropsValues.SCHEDULER_ENABLED) {
289 return;
290 }
291
292 try {
293 Scheduler scheduler = getScheduler(storageType);
294
295 groupName = fixMaxLength(
296 groupName, GROUP_NAME_MAX_LENGTH, storageType);
297
298 Set<JobKey> jobKeys = scheduler.getJobKeys(
299 GroupMatcher.jobGroupEquals(groupName));
300
301 scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
302
303 for (JobKey jobKey : jobKeys) {
304 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
305 }
306 }
307 catch (Exception e) {
308 throw new SchedulerException(
309 "Unable to pause jobs in group " + groupName, e);
310 }
311 }
312
313 @Override
314 public void pause(String jobName, String groupName, StorageType storageType)
315 throws SchedulerException {
316
317 if (!PropsValues.SCHEDULER_ENABLED) {
318 return;
319 }
320
321 try {
322 Scheduler scheduler = getScheduler(storageType);
323
324 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
325 groupName = fixMaxLength(
326 groupName, GROUP_NAME_MAX_LENGTH, storageType);
327
328 JobKey jobKey = new JobKey(jobName, groupName);
329
330 scheduler.pauseJob(jobKey);
331
332 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
333 }
334 catch (Exception e) {
335 throw new SchedulerException(
336 "Unable to pause job {jobName=" + jobName + ", groupName=" +
337 groupName + "}",
338 e);
339 }
340 }
341
342 @Override
343 public void resume(String groupName, StorageType storageType)
344 throws SchedulerException {
345
346 if (!PropsValues.SCHEDULER_ENABLED) {
347 return;
348 }
349
350 try {
351 Scheduler scheduler = getScheduler(storageType);
352
353 groupName = fixMaxLength(
354 groupName, GROUP_NAME_MAX_LENGTH, storageType);
355
356 Set<JobKey> jobKeys = scheduler.getJobKeys(
357 GroupMatcher.jobGroupEquals(groupName));
358
359 scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
360
361 for (JobKey jobKey : jobKeys) {
362 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
363 }
364 }
365 catch (Exception e) {
366 throw new SchedulerException(
367 "Unable to resume jobs in group " + groupName, e);
368 }
369 }
370
371 @Override
372 public void resume(
373 String jobName, String groupName, StorageType storageType)
374 throws SchedulerException {
375
376 if (!PropsValues.SCHEDULER_ENABLED) {
377 return;
378 }
379
380 try {
381 Scheduler scheduler = getScheduler(storageType);
382
383 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
384 groupName = fixMaxLength(
385 groupName, GROUP_NAME_MAX_LENGTH, storageType);
386
387 JobKey jobKey = new JobKey(jobName, groupName);
388
389 scheduler.resumeJob(jobKey);
390
391 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
392 }
393 catch (Exception e) {
394 throw new SchedulerException(
395 "Unable to resume job {jobName=" + jobName + ", groupName=" +
396 groupName + "}",
397 e);
398 }
399 }
400
401 @Override
402 public void schedule(
403 com.liferay.portal.kernel.scheduler.Trigger trigger,
404 String description, String destination, Message message,
405 StorageType storageType)
406 throws SchedulerException {
407
408 if (!PropsValues.SCHEDULER_ENABLED) {
409 return;
410 }
411
412 try {
413 Scheduler scheduler = getScheduler(storageType);
414
415 Trigger quartzTrigger = getQuartzTrigger(trigger, storageType);
416
417 if (quartzTrigger == null) {
418 return;
419 }
420
421 description = fixMaxLength(
422 description, DESCRIPTION_MAX_LENGTH, storageType);
423
424 if (message == null) {
425 message = new Message();
426 }
427 else {
428 message = message.clone();
429 }
430
431 registerMessageListeners(destination, message);
432
433 schedule(
434 scheduler, storageType, quartzTrigger, description, destination,
435 message);
436 }
437 catch (RuntimeException re) {
438
439
440
441
442 }
443 catch (Exception e) {
444 throw new SchedulerException("Unable to schedule job", e);
445 }
446 }
447
448 @Override
449 public void shutdown() throws SchedulerException {
450 if (!PropsValues.SCHEDULER_ENABLED) {
451 return;
452 }
453
454 try {
455 if (!_persistedScheduler.isShutdown()) {
456 _persistedScheduler.shutdown(false);
457 }
458
459 if (!_memoryScheduler.isShutdown()) {
460 _memoryScheduler.shutdown(false);
461 }
462 }
463 catch (Exception e) {
464 throw new SchedulerException("Unable to shutdown scheduler", e);
465 }
466 }
467
468 @Override
469 public void start() throws SchedulerException {
470 if (!PropsValues.SCHEDULER_ENABLED) {
471 return;
472 }
473
474 try {
475 _persistedScheduler.start();
476
477 initJobState();
478
479 _memoryScheduler.start();
480 }
481 catch (Exception e) {
482 throw new SchedulerException("Unable to start scheduler", e);
483 }
484 }
485
486 @Override
487 public void suppressError(
488 String jobName, String groupName, StorageType storageType)
489 throws SchedulerException {
490
491 if (!PropsValues.SCHEDULER_ENABLED) {
492 return;
493 }
494
495 try {
496 Scheduler scheduler = getScheduler(storageType);
497
498 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
499 groupName = fixMaxLength(
500 groupName, GROUP_NAME_MAX_LENGTH, storageType);
501
502 JobKey jobKey = new JobKey(jobName, groupName);
503
504 updateJobState(scheduler, jobKey, null, true);
505 }
506 catch (Exception e) {
507 throw new SchedulerException(
508 "Unable to suppress error for job {jobName=" + jobName +
509 ", groupName=" + groupName + "}",
510 e);
511 }
512 }
513
514 @Override
515 public void unschedule(String groupName, StorageType storageType)
516 throws SchedulerException {
517
518 if (!PropsValues.SCHEDULER_ENABLED) {
519 return;
520 }
521
522 try {
523 Scheduler scheduler = getScheduler(storageType);
524
525 groupName = fixMaxLength(
526 groupName, GROUP_NAME_MAX_LENGTH, storageType);
527
528 Set<JobKey> jobKeys = scheduler.getJobKeys(
529 GroupMatcher.jobGroupEquals(groupName));
530
531 for (JobKey jobKey : jobKeys) {
532 unschedule(scheduler, jobKey);
533 }
534 }
535 catch (Exception e) {
536 throw new SchedulerException(
537 "Unable to unschedule jobs in group " + groupName, e);
538 }
539 }
540
541 @Override
542 public void unschedule(
543 String jobName, String groupName, StorageType storageType)
544 throws SchedulerException {
545
546 if (!PropsValues.SCHEDULER_ENABLED) {
547 return;
548 }
549
550 try {
551 Scheduler scheduler = getScheduler(storageType);
552
553 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
554 groupName = fixMaxLength(
555 groupName, GROUP_NAME_MAX_LENGTH, storageType);
556
557 JobKey jobKey = new JobKey(jobName, groupName);
558
559 unschedule(scheduler, jobKey);
560 }
561 catch (Exception e) {
562 throw new SchedulerException(
563 "Unable to unschedule job {jobName=" + jobName +
564 ", groupName=" + groupName + "}",
565 e);
566 }
567 }
568
569 @Override
570 public void update(
571 com.liferay.portal.kernel.scheduler.Trigger trigger,
572 StorageType storageType)
573 throws SchedulerException {
574
575 if (!PropsValues.SCHEDULER_ENABLED) {
576 return;
577 }
578
579 try {
580 Scheduler scheduler = getScheduler(storageType);
581
582 update(scheduler, trigger, storageType);
583 }
584 catch (Exception e) {
585 throw new SchedulerException("Unable to update trigger", e);
586 }
587 }
588
589 protected String fixMaxLength(
590 String argument, int maxLength, StorageType storageType) {
591
592 if ((argument == null) || (storageType != StorageType.PERSISTED)) {
593 return argument;
594 }
595
596 if (argument.length() > maxLength) {
597 argument = argument.substring(0, maxLength);
598 }
599
600 return argument;
601 }
602
603 protected JobState getJobState(JobDataMap jobDataMap) {
604 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
605 JOB_STATE);
606
607 return JobStateSerializeUtil.deserialize(jobStateMap);
608 }
609
610 protected Message getMessage(JobDataMap jobDataMap) {
611 String messageJSON = (String)jobDataMap.get(MESSAGE);
612
613 return (Message)JSONFactoryUtil.deserialize(messageJSON);
614 }
615
616 protected MessageListener getMessageListener(
617 String messageListenerClassName, ClassLoader classLoader)
618 throws SchedulerException {
619
620 MessageListener schedulerEventListener = null;
621
622 try {
623 Class<? extends MessageListener> clazz =
624 (Class<? extends MessageListener>)classLoader.loadClass(
625 messageListenerClassName);
626
627 schedulerEventListener = clazz.newInstance();
628
629 schedulerEventListener =
630 (MessageListener)ProxyUtil.newProxyInstance(
631 classLoader, new Class<?>[] {MessageListener.class},
632 new ClassLoaderBeanHandler(
633 schedulerEventListener, classLoader));
634 }
635 catch (Exception e) {
636 throw new SchedulerException(
637 "Unable to register message listener with name " +
638 messageListenerClassName,
639 e);
640 }
641
642 return schedulerEventListener;
643 }
644
645 protected Trigger getQuartzTrigger(
646 com.liferay.portal.kernel.scheduler.Trigger trigger,
647 StorageType storageType)
648 throws SchedulerException {
649
650 if (trigger == null) {
651 return null;
652 }
653
654 Date endDate = trigger.getEndDate();
655 String jobName = fixMaxLength(
656 trigger.getJobName(), JOB_NAME_MAX_LENGTH, storageType);
657 String groupName = fixMaxLength(
658 trigger.getGroupName(), GROUP_NAME_MAX_LENGTH, storageType);
659
660 Date startDate = trigger.getStartDate();
661
662 if (startDate == null) {
663 startDate = new Date(System.currentTimeMillis());
664 }
665
666 Trigger quartzTrigger = null;
667
668 TriggerType triggerType = trigger.getTriggerType();
669
670 if (triggerType.equals(TriggerType.CRON)) {
671 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
672
673 triggerBuilder.endAt(endDate);
674 triggerBuilder.forJob(jobName, groupName);
675 triggerBuilder.startAt(startDate);
676 triggerBuilder.withIdentity(jobName, groupName);
677
678 CronScheduleBuilder cronScheduleBuilder =
679 CronScheduleBuilder.cronSchedule(
680 (String)trigger.getTriggerContent());
681
682 triggerBuilder.withSchedule(cronScheduleBuilder);
683
684 quartzTrigger = triggerBuilder.build();
685 }
686 else if (triggerType.equals(TriggerType.SIMPLE)) {
687 long interval = (Long)trigger.getTriggerContent();
688
689 if (interval <= 0) {
690 if (_log.isDebugEnabled()) {
691 _log.debug(
692 "Not scheduling " + trigger.getJobName() +
693 " because interval is less than or equal to 0");
694 }
695
696 return null;
697 }
698
699 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
700
701 triggerBuilder.endAt(endDate);
702 triggerBuilder.forJob(jobName, groupName);
703 triggerBuilder.startAt(startDate);
704 triggerBuilder.withIdentity(jobName, groupName);
705
706 SimpleScheduleBuilder simpleScheduleBuilder =
707 SimpleScheduleBuilder.simpleSchedule();
708
709 simpleScheduleBuilder.withIntervalInMilliseconds(interval);
710 simpleScheduleBuilder.withRepeatCount(
711 SimpleTrigger.REPEAT_INDEFINITELY);
712
713 triggerBuilder.withSchedule(simpleScheduleBuilder);
714
715 quartzTrigger = triggerBuilder.build();
716 }
717 else {
718 throw new SchedulerException(
719 "Unknown trigger type " + trigger.getTriggerType());
720 }
721
722 return quartzTrigger;
723 }
724
725 protected SchedulerResponse getScheduledJob(
726 Scheduler scheduler, JobKey jobKey)
727 throws Exception {
728
729 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
730
731 if (jobDetail == null) {
732 return null;
733 }
734
735 JobDataMap jobDataMap = jobDetail.getJobDataMap();
736
737 String description = jobDataMap.getString(DESCRIPTION);
738 String destinationName = jobDataMap.getString(DESTINATION_NAME);
739 Message message = getMessage(jobDataMap);
740 StorageType storageType = StorageType.valueOf(
741 jobDataMap.getString(STORAGE_TYPE));
742
743 SchedulerResponse schedulerResponse = null;
744
745 String jobName = jobKey.getName();
746 String groupName = jobKey.getGroup();
747
748 TriggerKey triggerKey = new TriggerKey(jobName, groupName);
749
750 Trigger trigger = scheduler.getTrigger(triggerKey);
751
752 JobState jobState = getJobState(jobDataMap);
753
754 message.put(JOB_STATE, jobState);
755
756 if (trigger == null) {
757 schedulerResponse = new SchedulerResponse();
758
759 schedulerResponse.setDescription(description);
760 schedulerResponse.setDestinationName(destinationName);
761 schedulerResponse.setGroupName(groupName);
762 schedulerResponse.setJobName(jobName);
763 schedulerResponse.setMessage(message);
764 schedulerResponse.setStorageType(storageType);
765 }
766 else {
767 message.put(END_TIME, trigger.getEndTime());
768 message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
769 message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
770 message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
771 message.put(START_TIME, trigger.getStartTime());
772
773 if (trigger instanceof CronTrigger) {
774 CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
775
776 schedulerResponse = new SchedulerResponse();
777
778 schedulerResponse.setDescription(description);
779 schedulerResponse.setDestinationName(destinationName);
780 schedulerResponse.setMessage(message);
781 schedulerResponse.setStorageType(storageType);
782 schedulerResponse.setTrigger(
783 new com.liferay.portal.kernel.scheduler.CronTrigger(
784 jobName, groupName, cronTrigger.getStartTime(),
785 cronTrigger.getEndTime(),
786 cronTrigger.getCronExpression()));
787 }
788 else if (trigger instanceof SimpleTrigger) {
789 SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
790
791 schedulerResponse = new SchedulerResponse();
792
793 schedulerResponse.setDescription(description);
794 schedulerResponse.setDestinationName(destinationName);
795 schedulerResponse.setMessage(message);
796 schedulerResponse.setStorageType(storageType);
797 schedulerResponse.setTrigger(
798 new IntervalTrigger(
799 jobName, groupName, simpleTrigger.getStartTime(),
800 simpleTrigger.getEndTime(),
801 simpleTrigger.getRepeatInterval()));
802 }
803 }
804
805 return schedulerResponse;
806 }
807
808 protected List<SchedulerResponse> getScheduledJobs(
809 Scheduler scheduler, String groupName, StorageType storageType)
810 throws Exception {
811
812 groupName = fixMaxLength(groupName, GROUP_NAME_MAX_LENGTH, storageType);
813
814 List<SchedulerResponse> schedulerResponses =
815 new ArrayList<SchedulerResponse>();
816
817 Set<JobKey> jobKeys = scheduler.getJobKeys(
818 GroupMatcher.jobGroupEquals(groupName));
819
820 for (JobKey jobKey : jobKeys) {
821 SchedulerResponse schedulerResponse = getScheduledJob(
822 scheduler, jobKey);
823
824 if ((schedulerResponse != null) &&
825 ((storageType == null) ||
826 (storageType == schedulerResponse.getStorageType()))) {
827
828 schedulerResponses.add(schedulerResponse);
829 }
830 }
831
832 return schedulerResponses;
833 }
834
835 protected Scheduler getScheduler(StorageType storageType) {
836 if (storageType == StorageType.PERSISTED) {
837 return _persistedScheduler;
838 }
839 else {
840 return _memoryScheduler;
841 }
842 }
843
844 protected StorageType getStorageType(String groupName) {
845 int pos = groupName.indexOf(CharPool.POUND);
846
847 String storageTypeString = groupName.substring(0, pos);
848
849 return StorageType.valueOf(storageTypeString);
850 }
851
852 protected Scheduler initializeScheduler(
853 String propertiesPrefix, boolean useQuartzCluster)
854 throws Exception {
855
856 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
857
858 Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
859
860 if (useQuartzCluster) {
861 DB db = DBFactoryUtil.getDB();
862
863 String dbType = db.getType();
864
865 if (dbType.equals(DB.TYPE_SQLSERVER)) {
866 String lockHandlerClassName = properties.getProperty(
867 "org.quartz.jobStore.lockHandler.class");
868
869 if (Validator.isNull(lockHandlerClassName)) {
870 properties.setProperty(
871 "org.quartz.jobStore.lockHandler.class",
872 UpdateLockRowSemaphore.class.getName());
873 }
874 }
875
876 if (PropsValues.CLUSTER_LINK_ENABLED) {
877 if (dbType.equals(DB.TYPE_HYPERSONIC)) {
878 _log.error("Unable to cluster scheduler on Hypersonic");
879 }
880 else {
881 properties.put(
882 "org.quartz.jobStore.isClustered",
883 Boolean.TRUE.toString());
884 }
885 }
886 }
887
888 schedulerFactory.initialize(properties);
889
890 return schedulerFactory.getScheduler();
891 }
892
893 protected void initJobState() throws Exception {
894 List<String> groupNames = _persistedScheduler.getJobGroupNames();
895
896 for (String groupName : groupNames) {
897 Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
898 GroupMatcher.jobGroupEquals(groupName));
899
900 for (JobKey jobKey : jobkeys) {
901 Trigger trigger = _persistedScheduler.getTrigger(
902 new TriggerKey(jobKey.getName(), jobKey.getGroup()));
903
904 if (trigger != null) {
905 continue;
906 }
907
908 JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
909
910 JobDataMap jobDataMap = jobDetail.getJobDataMap();
911
912 Message message = getMessage(jobDataMap);
913
914 message.put(JOB_NAME, jobKey.getName());
915 message.put(GROUP_NAME, jobKey.getGroup());
916
917 SchedulerEngineHelperUtil.auditSchedulerJobs(
918 message, TriggerState.EXPIRED);
919
920 _persistedScheduler.deleteJob(jobKey);
921 }
922 }
923 }
924
925 protected void registerMessageListeners(
926 String destinationName, Message message)
927 throws SchedulerException {
928
929 String messageListenerClassName = message.getString(
930 MESSAGE_LISTENER_CLASS_NAME);
931
932 if (Validator.isNull(messageListenerClassName)) {
933 return;
934 }
935
936 String portletId = message.getString(PORTLET_ID);
937
938 ClassLoader classLoader = null;
939
940 if (Validator.isNull(portletId)) {
941 classLoader = ClassLoaderUtil.getPortalClassLoader();
942 }
943 else {
944 classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
945
946 if (classLoader == null) {
947
948
949
950
951
952 classLoader = ClassLoaderPool.getClassLoader(portletId);
953 }
954 }
955
956 if (classLoader == null) {
957 throw new SchedulerException(
958 "Unable to find class loader for portlet " + portletId);
959 }
960
961 MessageListener schedulerEventListener = getMessageListener(
962 messageListenerClassName, classLoader);
963
964 SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
965 new SchedulerEventMessageListenerWrapper();
966
967 schedulerEventListenerWrapper.setMessageListener(
968 schedulerEventListener);
969
970 schedulerEventListenerWrapper.afterPropertiesSet();
971
972 MessageBusUtil.registerMessageListener(
973 destinationName, schedulerEventListenerWrapper);
974
975 message.put(
976 MESSAGE_LISTENER_UUID,
977 schedulerEventListenerWrapper.getMessageListenerUUID());
978 }
979
980 protected void schedule(
981 Scheduler scheduler, StorageType storageType, Trigger trigger,
982 String description, String destinationName, Message message)
983 throws Exception {
984
985 try {
986 JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
987
988 jobBuilder.withIdentity(trigger.getJobKey());
989
990 jobBuilder.storeDurably();
991
992 JobDetail jobDetail = jobBuilder.build();
993
994 JobDataMap jobDataMap = jobDetail.getJobDataMap();
995
996 jobDataMap.put(DESCRIPTION, description);
997 jobDataMap.put(DESTINATION_NAME, destinationName);
998 jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
999 jobDataMap.put(STORAGE_TYPE, storageType.toString());
1000
1001 JobState jobState = new JobState(
1002 TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
1003
1004 jobDataMap.put(
1005 JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1006
1007 unregisterMessageListener(scheduler, trigger.getJobKey());
1008
1009 synchronized (this) {
1010 scheduler.deleteJob(trigger.getJobKey());
1011 scheduler.scheduleJob(jobDetail, trigger);
1012 }
1013 }
1014 catch (ObjectAlreadyExistsException oaee) {
1015 if (_log.isInfoEnabled()) {
1016 _log.info("Message is already scheduled");
1017 }
1018 }
1019 }
1020
1021 protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
1022 throws Exception {
1023
1024 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1025
1026 if (jobDetail == null) {
1027 return;
1028 }
1029
1030 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1031
1032 if (jobDataMap == null) {
1033 return;
1034 }
1035
1036 Message message = getMessage(jobDataMap);
1037
1038 String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
1039
1040 if (messageListenerUUID == null) {
1041 return;
1042 }
1043
1044 String destinationName = jobDataMap.getString(DESTINATION_NAME);
1045
1046 MessageBus messageBus = MessageBusUtil.getMessageBus();
1047
1048 Destination destination = messageBus.getDestination(destinationName);
1049
1050 if (destination == null) {
1051 return;
1052 }
1053
1054 Set<MessageListener> messageListeners =
1055 destination.getMessageListeners();
1056
1057 for (MessageListener messageListener : messageListeners) {
1058 if (!(messageListener instanceof InvokerMessageListener)) {
1059 continue;
1060 }
1061
1062 InvokerMessageListener invokerMessageListener =
1063 (InvokerMessageListener)messageListener;
1064
1065 messageListener = invokerMessageListener.getMessageListener();
1066
1067 if (!(messageListener instanceof
1068 SchedulerEventMessageListenerWrapper)) {
1069
1070 continue;
1071 }
1072
1073 SchedulerEventMessageListenerWrapper schedulerMessageListener =
1074 (SchedulerEventMessageListenerWrapper)messageListener;
1075
1076 if (messageListenerUUID.equals(
1077 schedulerMessageListener.getMessageListenerUUID())) {
1078
1079 messageBus.unregisterMessageListener(
1080 destinationName, schedulerMessageListener);
1081
1082 return;
1083 }
1084 }
1085 }
1086
1087 protected void unschedule(Scheduler scheduler, JobKey jobKey)
1088 throws Exception {
1089
1090 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1091
1092 TriggerKey triggerKey = new TriggerKey(
1093 jobKey.getName(), jobKey.getGroup());
1094
1095 if (jobDetail == null) {
1096 return;
1097 }
1098
1099 unregisterMessageListener(scheduler, jobKey);
1100
1101 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1102
1103 JobState jobState = getJobState(jobDataMap);
1104
1105 Trigger trigger = scheduler.getTrigger(triggerKey);
1106
1107 jobState.setTriggerDate(END_TIME, new Date());
1108 jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1109 jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1110 jobState.setTriggerDate(
1111 PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1112 jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1113
1114 jobState.setTriggerState(TriggerState.UNSCHEDULED);
1115
1116 jobState.clearExceptions();
1117
1118 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1119
1120 scheduler.unscheduleJob(triggerKey);
1121
1122 scheduler.addJob(jobDetail, true);
1123 }
1124
1125 protected void update(
1126 Scheduler scheduler,
1127 com.liferay.portal.kernel.scheduler.Trigger trigger,
1128 StorageType storageType)
1129 throws Exception {
1130
1131 Trigger quartzTrigger = getQuartzTrigger(trigger, storageType);
1132
1133 if (quartzTrigger == null) {
1134 return;
1135 }
1136
1137 TriggerKey triggerKey = quartzTrigger.getKey();
1138
1139 if (scheduler.getTrigger(triggerKey) != null) {
1140 scheduler.rescheduleJob(triggerKey, quartzTrigger);
1141 }
1142 else {
1143 JobKey jobKey = quartzTrigger.getJobKey();
1144
1145 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1146
1147 if (jobDetail == null) {
1148 return;
1149 }
1150
1151 synchronized (this) {
1152 scheduler.deleteJob(jobKey);
1153 scheduler.scheduleJob(jobDetail, quartzTrigger);
1154 }
1155
1156 updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1157 }
1158 }
1159
1160 protected void updateJobState(
1161 Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1162 boolean suppressError)
1163 throws Exception {
1164
1165 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1166
1167 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1168
1169 JobState jobState = getJobState(jobDataMap);
1170
1171 if (triggerState != null) {
1172 jobState.setTriggerState(triggerState);
1173 }
1174
1175 if (suppressError) {
1176 jobState.clearExceptions();
1177 }
1178
1179 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1180
1181 scheduler.addJob(jobDetail, true);
1182 }
1183
1184 @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1185 protected QuartzLocalService quartzLocalService;
1186
1187 private static final Log _log = LogFactoryUtil.getLog(
1188 QuartzSchedulerEngine.class);
1189
1190 private Scheduler _memoryScheduler;
1191 private Scheduler _persistedScheduler;
1192
1193 }