001
014
015 package com.liferay.portal.scheduler.quartz;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019 import com.liferay.portal.kernel.dao.db.DB;
020 import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021 import com.liferay.portal.kernel.json.JSONFactoryUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Destination;
025 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026 import com.liferay.portal.kernel.messaging.Message;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageBusUtil;
029 import com.liferay.portal.kernel.messaging.MessageListener;
030 import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031 import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032 import com.liferay.portal.kernel.scheduler.JobState;
033 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036 import com.liferay.portal.kernel.scheduler.SchedulerException;
037 import com.liferay.portal.kernel.scheduler.StorageType;
038 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039 import com.liferay.portal.kernel.scheduler.TriggerState;
040 import com.liferay.portal.kernel.scheduler.TriggerType;
041 import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.CharPool;
044 import com.liferay.portal.kernel.util.ClassLoaderPool;
045 import com.liferay.portal.kernel.util.ProxyUtil;
046 import com.liferay.portal.kernel.util.StringPool;
047 import com.liferay.portal.kernel.util.Validator;
048 import com.liferay.portal.scheduler.job.MessageSenderJob;
049 import com.liferay.portal.security.pacl.PACLClassLoaderUtil;
050 import com.liferay.portal.service.QuartzLocalService;
051 import com.liferay.portal.util.PropsUtil;
052 import com.liferay.portal.util.PropsValues;
053
054 import java.text.ParseException;
055
056 import java.util.ArrayList;
057 import java.util.Collections;
058 import java.util.Date;
059 import java.util.List;
060 import java.util.Map;
061 import java.util.Properties;
062 import java.util.Set;
063
064 import org.quartz.CronScheduleBuilder;
065 import org.quartz.CronTrigger;
066 import org.quartz.JobBuilder;
067 import org.quartz.JobDataMap;
068 import org.quartz.JobDetail;
069 import org.quartz.JobKey;
070 import org.quartz.ObjectAlreadyExistsException;
071 import org.quartz.Scheduler;
072 import org.quartz.SimpleScheduleBuilder;
073 import org.quartz.SimpleTrigger;
074 import org.quartz.Trigger;
075 import org.quartz.TriggerBuilder;
076 import org.quartz.TriggerKey;
077 import org.quartz.impl.StdSchedulerFactory;
078 import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
079 import org.quartz.impl.matchers.GroupMatcher;
080
081
088 public class QuartzSchedulerEngine implements SchedulerEngine {
089
090 public void afterPropertiesSet() {
091 if (!PropsValues.SCHEDULER_ENABLED) {
092 return;
093 }
094
095 try {
096 quartzLocalService.checkQuartzTables();
097
098 _persistedScheduler = initializeScheduler(
099 "persisted.scheduler.", true);
100
101 _memoryScheduler = initializeScheduler("memory.scheduler.", false);
102 }
103 catch (Exception e) {
104 _log.error("Unable to initialize engine", e);
105 }
106 }
107
108 public void delete(String groupName) throws SchedulerException {
109 if (!PropsValues.SCHEDULER_ENABLED) {
110 return;
111 }
112
113 try {
114 Scheduler scheduler = getScheduler(groupName);
115
116 groupName = fixMaxLength(
117 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
118
119 Set<JobKey> jobKeys = scheduler.getJobKeys(
120 GroupMatcher.jobGroupEquals(groupName));
121
122 for (JobKey jobKey : jobKeys) {
123 unregisterMessageListener(scheduler, jobKey);
124
125 scheduler.deleteJob(jobKey);
126 }
127 }
128 catch (Exception e) {
129 throw new SchedulerException(
130 "Unable to delete jobs in group " + groupName, e);
131 }
132 }
133
134 public void delete(String jobName, String groupName)
135 throws SchedulerException {
136
137 if (!PropsValues.SCHEDULER_ENABLED) {
138 return;
139 }
140
141 try {
142 Scheduler scheduler = getScheduler(groupName);
143
144 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
145 groupName = fixMaxLength(
146 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
147
148 JobKey jobKey = new JobKey(jobName, groupName);
149
150 unregisterMessageListener(scheduler, jobKey);
151
152 scheduler.deleteJob(jobKey);
153 }
154 catch (Exception e) {
155 throw new SchedulerException(
156 "Unable to delete job {jobName=" + jobName + ", groupName=" +
157 groupName + "}",
158 e);
159 }
160 }
161
162 public void destroy() {
163 try {
164 shutdown();
165 }
166 catch (SchedulerException se) {
167 if (_log.isWarnEnabled()) {
168 _log.warn("Unable to shutdown", se);
169 }
170 }
171 }
172
173 public SchedulerResponse getScheduledJob(String jobName, String groupName)
174 throws SchedulerException {
175
176 if (!PropsValues.SCHEDULER_ENABLED) {
177 return null;
178 }
179
180 try {
181 Scheduler scheduler = getScheduler(groupName);
182
183 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
184 groupName = fixMaxLength(
185 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
186
187 JobKey jobKey = new JobKey(jobName, groupName);
188
189 return getScheduledJob(scheduler, jobKey);
190 }
191 catch (Exception e) {
192 throw new SchedulerException(
193 "Unable to get job {jobName=" + jobName + ", groupName=" +
194 groupName + "}",
195 e);
196 }
197 }
198
199 public List<SchedulerResponse> getScheduledJobs()
200 throws SchedulerException {
201
202 if (!PropsValues.SCHEDULER_ENABLED) {
203 return Collections.emptyList();
204 }
205
206 try {
207 List<String> groupNames = _persistedScheduler.getJobGroupNames();
208
209 List<SchedulerResponse> schedulerResponses =
210 new ArrayList<SchedulerResponse>();
211
212 for (String groupName : groupNames) {
213 schedulerResponses.addAll(
214 getScheduledJobs(_persistedScheduler, groupName));
215 }
216
217 groupNames = _memoryScheduler.getJobGroupNames();
218
219 for (String groupName : groupNames) {
220 schedulerResponses.addAll(
221 getScheduledJobs(_memoryScheduler, groupName));
222 }
223
224 return schedulerResponses;
225 }
226 catch (Exception e) {
227 throw new SchedulerException("Unable to get jobs", e);
228 }
229 }
230
231 public List<SchedulerResponse> getScheduledJobs(String groupName)
232 throws SchedulerException {
233
234 if (!PropsValues.SCHEDULER_ENABLED) {
235 return Collections.emptyList();
236 }
237
238 try {
239 Scheduler scheduler = getScheduler(groupName);
240
241 return getScheduledJobs(scheduler, groupName);
242 }
243 catch (Exception e) {
244 throw new SchedulerException(
245 "Unable to get jobs in group " + groupName, e);
246 }
247 }
248
249 public void pause(String groupName) throws SchedulerException {
250 if (!PropsValues.SCHEDULER_ENABLED) {
251 return;
252 }
253
254 try {
255 Scheduler scheduler = getScheduler(groupName);
256
257 groupName = fixMaxLength(
258 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
259
260 Set<JobKey> jobKeys = scheduler.getJobKeys(
261 GroupMatcher.jobGroupEquals(groupName));
262
263 for (JobKey jobKey : jobKeys) {
264 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
265 }
266
267 scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
268 }
269 catch (Exception e) {
270 throw new SchedulerException(
271 "Unable to pause jobs in group " + groupName, e);
272 }
273 }
274
275 public void pause(String jobName, String groupName)
276 throws SchedulerException {
277
278 if (!PropsValues.SCHEDULER_ENABLED) {
279 return;
280 }
281
282 try {
283 Scheduler scheduler = getScheduler(groupName);
284
285 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
286 groupName = fixMaxLength(
287 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
288
289 JobKey jobKey = new JobKey(jobName, groupName);
290
291 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
292
293 scheduler.pauseJob(jobKey);
294 }
295 catch (Exception e) {
296 throw new SchedulerException(
297 "Unable to pause job {jobName=" + jobName + ", groupName=" +
298 groupName + "}",
299 e);
300 }
301 }
302
303 public void resume(String groupName) throws SchedulerException {
304 if (!PropsValues.SCHEDULER_ENABLED) {
305 return;
306 }
307
308 try {
309 Scheduler scheduler = getScheduler(groupName);
310
311 groupName = fixMaxLength(
312 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
313
314 Set<JobKey> jobKeys = scheduler.getJobKeys(
315 GroupMatcher.jobGroupEquals(groupName));
316
317 for (JobKey jobKey : jobKeys) {
318 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
319 }
320
321 scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
322 }
323 catch (Exception e) {
324 throw new SchedulerException(
325 "Unable to resume jobs in group " + groupName, e);
326 }
327 }
328
329 public void resume(String jobName, String groupName)
330 throws SchedulerException {
331
332 if (!PropsValues.SCHEDULER_ENABLED) {
333 return;
334 }
335
336 try {
337 Scheduler scheduler = getScheduler(groupName);
338
339 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
340 groupName = fixMaxLength(
341 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
342
343 JobKey jobKey = new JobKey(jobName, groupName);
344
345 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
346
347 scheduler.resumeJob(jobKey);
348 }
349 catch (Exception e) {
350 throw new SchedulerException(
351 "Unable to resume job {jobName=" + jobName + ", groupName=" +
352 groupName + "}",
353 e);
354 }
355 }
356
357 public void schedule(
358 com.liferay.portal.kernel.scheduler.Trigger trigger,
359 String description, String destination, Message message)
360 throws SchedulerException {
361
362 if (!PropsValues.SCHEDULER_ENABLED) {
363 return;
364 }
365
366 try {
367 Scheduler scheduler = getScheduler(trigger.getGroupName());
368
369 StorageType storageType = getStorageType(trigger.getGroupName());
370
371 trigger = TriggerFactoryUtil.buildTrigger(
372 trigger.getTriggerType(), trigger.getJobName(),
373 getOriginalGroupName(trigger.getGroupName()),
374 trigger.getStartDate(), trigger.getEndDate(),
375 trigger.getTriggerContent());
376
377 Trigger quartzTrigger = getQuartzTrigger(trigger);
378
379 if (quartzTrigger == null) {
380 return;
381 }
382
383 description = fixMaxLength(description, DESCRIPTION_MAX_LENGTH);
384
385 if (message == null) {
386 message = new Message();
387 }
388 else {
389 message = message.clone();
390 }
391
392 registerMessageListeners(
393 trigger.getJobName(), trigger.getGroupName(), destination,
394 message);
395
396 schedule(
397 scheduler, storageType, quartzTrigger, description, destination,
398 message);
399 }
400 catch (RuntimeException re) {
401
402
403
404
405 }
406 catch (Exception e) {
407 throw new SchedulerException("Unable to schedule job", e);
408 }
409 }
410
411 public void shutdown() throws SchedulerException {
412 if (!PropsValues.SCHEDULER_ENABLED) {
413 return;
414 }
415
416 try {
417 if (!_persistedScheduler.isShutdown()) {
418 _persistedScheduler.shutdown(false);
419 }
420
421 if (!_memoryScheduler.isShutdown()) {
422 _memoryScheduler.shutdown(false);
423 }
424 }
425 catch (Exception e) {
426 throw new SchedulerException("Unable to shutdown scheduler", e);
427 }
428 }
429
430 public void start() throws SchedulerException {
431 if (!PropsValues.SCHEDULER_ENABLED) {
432 return;
433 }
434
435 try {
436 _persistedScheduler.start();
437
438 initJobState();
439
440 _memoryScheduler.start();
441 }
442 catch (Exception e) {
443 throw new SchedulerException("Unable to start scheduler", e);
444 }
445 }
446
447 public void suppressError(String jobName, String groupName)
448 throws SchedulerException {
449
450 if (!PropsValues.SCHEDULER_ENABLED) {
451 return;
452 }
453
454 try {
455 Scheduler scheduler = getScheduler(groupName);
456
457 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
458 groupName = fixMaxLength(
459 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
460
461 JobKey jobKey = new JobKey(jobName, groupName);
462
463 updateJobState(scheduler, jobKey, null, true);
464 }
465 catch (Exception e) {
466 throw new SchedulerException(
467 "Unable to suppress error for job {jobName=" + jobName +
468 ", groupName=" + groupName + "}",
469 e);
470 }
471 }
472
473 public void unschedule(String groupName) throws SchedulerException {
474 if (!PropsValues.SCHEDULER_ENABLED) {
475 return;
476 }
477
478 try {
479 Scheduler scheduler = getScheduler(groupName);
480
481 groupName = fixMaxLength(
482 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
483
484 Set<JobKey> jobKeys = scheduler.getJobKeys(
485 GroupMatcher.jobGroupEquals(groupName));
486
487 for (JobKey jobKey : jobKeys) {
488 unschedule(scheduler, jobKey);
489 }
490 }
491 catch (Exception e) {
492 throw new SchedulerException(
493 "Unable to unschedule jobs in group " + groupName, e);
494 }
495 }
496
497 public void unschedule(String jobName, String groupName)
498 throws SchedulerException {
499
500 if (!PropsValues.SCHEDULER_ENABLED) {
501 return;
502 }
503
504 try {
505 Scheduler scheduler = getScheduler(groupName);
506
507 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
508 groupName = fixMaxLength(
509 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
510
511 JobKey jobKey = new JobKey(jobName, groupName);
512
513 unschedule(scheduler, jobKey);
514 }
515 catch (Exception e) {
516 throw new SchedulerException(
517 "Unable to unschedule job {jobName=" + jobName +
518 ", groupName=" + groupName + "}",
519 e);
520 }
521 }
522
523 public void update(com.liferay.portal.kernel.scheduler.Trigger trigger)
524 throws SchedulerException {
525
526 if (!PropsValues.SCHEDULER_ENABLED) {
527 return;
528 }
529
530 try {
531 Scheduler scheduler = getScheduler(trigger.getGroupName());
532
533 trigger = TriggerFactoryUtil.buildTrigger(
534 trigger.getTriggerType(), trigger.getJobName(),
535 getOriginalGroupName(trigger.getGroupName()),
536 trigger.getStartDate(), trigger.getEndDate(),
537 trigger.getTriggerContent());
538
539 update(scheduler, trigger);
540 }
541 catch (Exception e) {
542 throw new SchedulerException("Unable to update trigger", e);
543 }
544 }
545
546 protected String fixMaxLength(String argument, int maxLength) {
547 if (argument == null) {
548 return null;
549 }
550
551 if (argument.length() > maxLength) {
552 argument = argument.substring(0, maxLength);
553 }
554
555 return argument;
556 }
557
558 protected String getFullName(String jobName, String groupName) {
559 return groupName.concat(StringPool.PERIOD).concat(jobName);
560 }
561
562 protected JobState getJobState(JobDataMap jobDataMap) {
563 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
564 JOB_STATE);
565
566 return JobStateSerializeUtil.deserialize(jobStateMap);
567 }
568
569 protected Message getMessage(JobDataMap jobDataMap) {
570 String messageJSON = (String)jobDataMap.get(MESSAGE);
571
572 return (Message)JSONFactoryUtil.deserialize(messageJSON);
573 }
574
575 protected MessageListener getMessageListener(
576 String messageListenerClassName, ClassLoader classLoader)
577 throws SchedulerException {
578
579 MessageListener schedulerEventListener = null;
580
581 try {
582 Class<? extends MessageListener> clazz =
583 (Class<? extends MessageListener>)classLoader.loadClass(
584 messageListenerClassName);
585
586 schedulerEventListener = clazz.newInstance();
587
588 schedulerEventListener =
589 (MessageListener)ProxyUtil.newProxyInstance(
590 classLoader, new Class<?>[] {MessageListener.class},
591 new ClassLoaderBeanHandler(
592 schedulerEventListener, classLoader));
593 }
594 catch (Exception e) {
595 throw new SchedulerException(
596 "Unable to register message listener with name " +
597 messageListenerClassName,
598 e);
599 }
600
601 return schedulerEventListener;
602 }
603
604 protected String getOriginalGroupName(String groupName) {
605 int pos = groupName.indexOf(CharPool.POUND);
606
607 return groupName.substring(pos + 1);
608 }
609
610 protected Trigger getQuartzTrigger(
611 com.liferay.portal.kernel.scheduler.Trigger trigger)
612 throws SchedulerException {
613
614 if (trigger == null) {
615 return null;
616 }
617
618 Date endDate = trigger.getEndDate();
619 String jobName = fixMaxLength(
620 trigger.getJobName(), JOB_NAME_MAX_LENGTH);
621 String groupName = fixMaxLength(
622 trigger.getGroupName(), GROUP_NAME_MAX_LENGTH);
623
624 Date startDate = trigger.getStartDate();
625
626 if (startDate == null) {
627 startDate = new Date(System.currentTimeMillis());
628 }
629
630 Trigger quartzTrigger = null;
631
632 TriggerType triggerType = trigger.getTriggerType();
633
634 if (triggerType.equals(TriggerType.CRON)) {
635 try {
636 TriggerBuilder<Trigger>triggerBuilder =
637 TriggerBuilder.newTrigger();
638
639 triggerBuilder.endAt(endDate);
640 triggerBuilder.forJob(jobName, groupName);
641 triggerBuilder.startAt(startDate);
642 triggerBuilder.withIdentity(jobName, groupName);
643
644 CronScheduleBuilder cronScheduleBuilder =
645 CronScheduleBuilder.cronSchedule(
646 (String)trigger.getTriggerContent());
647
648 triggerBuilder.withSchedule(cronScheduleBuilder);
649
650 quartzTrigger = triggerBuilder.build();
651 }
652 catch (ParseException pe) {
653 throw new SchedulerException(
654 "Unable to parse cron text " + trigger.getTriggerContent());
655 }
656 }
657 else if (triggerType.equals(TriggerType.SIMPLE)) {
658 long interval = (Long)trigger.getTriggerContent();
659
660 if (interval <= 0) {
661 if (_log.isDebugEnabled()) {
662 _log.debug(
663 "Not scheduling " + trigger.getJobName() +
664 " because interval is less than or equal to 0");
665 }
666
667 return null;
668 }
669
670 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
671
672 triggerBuilder.endAt(endDate);
673 triggerBuilder.forJob(jobName, groupName);
674 triggerBuilder.startAt(startDate);
675 triggerBuilder.withIdentity(jobName, groupName);
676
677 SimpleScheduleBuilder simpleScheduleBuilder =
678 SimpleScheduleBuilder.simpleSchedule();
679
680 simpleScheduleBuilder.withIntervalInMilliseconds(interval);
681 simpleScheduleBuilder.withRepeatCount(
682 SimpleTrigger.REPEAT_INDEFINITELY);
683
684 triggerBuilder.withSchedule(simpleScheduleBuilder);
685
686 quartzTrigger = triggerBuilder.build();
687 }
688 else {
689 throw new SchedulerException(
690 "Unknown trigger type " + trigger.getTriggerType());
691 }
692
693 return quartzTrigger;
694 }
695
696 protected SchedulerResponse getScheduledJob(
697 Scheduler scheduler, JobKey jobKey)
698 throws Exception {
699
700 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
701
702 if (jobDetail == null) {
703 return null;
704 }
705
706 JobDataMap jobDataMap = jobDetail.getJobDataMap();
707
708 String description = jobDataMap.getString(DESCRIPTION);
709 String destinationName = jobDataMap.getString(DESTINATION_NAME);
710 Message message = getMessage(jobDataMap);
711 StorageType storageType = StorageType.valueOf(
712 jobDataMap.getString(STORAGE_TYPE));
713
714 SchedulerResponse schedulerResponse = null;
715
716 String jobName = jobKey.getName();
717 String groupName = jobKey.getGroup();
718
719 TriggerKey triggerKey = new TriggerKey(jobName, groupName);
720
721 Trigger trigger = scheduler.getTrigger(triggerKey);
722
723 JobState jobState = getJobState(jobDataMap);
724
725 message.put(JOB_STATE, jobState);
726
727 if (trigger == null) {
728 schedulerResponse = new SchedulerResponse();
729
730 schedulerResponse.setDescription(description);
731 schedulerResponse.setDestinationName(destinationName);
732 schedulerResponse.setGroupName(groupName);
733 schedulerResponse.setJobName(jobName);
734 schedulerResponse.setMessage(message);
735 schedulerResponse.setStorageType(storageType);
736 }
737 else {
738 message.put(END_TIME, trigger.getEndTime());
739 message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
740 message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
741 message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
742 message.put(START_TIME, trigger.getStartTime());
743
744 if (CronTrigger.class.isAssignableFrom(trigger.getClass())) {
745
746 CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
747
748 schedulerResponse = new SchedulerResponse();
749
750 schedulerResponse.setDescription(description);
751 schedulerResponse.setDestinationName(destinationName);
752 schedulerResponse.setMessage(message);
753 schedulerResponse.setStorageType(storageType);
754 schedulerResponse.setTrigger(
755 new com.liferay.portal.kernel.scheduler.CronTrigger(
756 jobName, groupName, cronTrigger.getStartTime(),
757 cronTrigger.getEndTime(),
758 cronTrigger.getCronExpression()));
759 }
760 else if (SimpleTrigger.class.isAssignableFrom(trigger.getClass())) {
761 SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
762
763 schedulerResponse = new SchedulerResponse();
764
765 schedulerResponse.setDescription(description);
766 schedulerResponse.setDestinationName(destinationName);
767 schedulerResponse.setMessage(message);
768 schedulerResponse.setStorageType(storageType);
769 schedulerResponse.setTrigger(
770 new IntervalTrigger(
771 jobName, groupName, simpleTrigger.getStartTime(),
772 simpleTrigger.getEndTime(),
773 simpleTrigger.getRepeatInterval()));
774 }
775 }
776
777 return schedulerResponse;
778 }
779
780 protected List<SchedulerResponse> getScheduledJobs(
781 Scheduler scheduler, String groupName)
782 throws Exception {
783
784 groupName = fixMaxLength(
785 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
786
787 List<SchedulerResponse> schedulerResponses =
788 new ArrayList<SchedulerResponse>();
789
790 Set<JobKey> jobKeys = scheduler.getJobKeys(
791 GroupMatcher.jobGroupEquals(groupName));
792
793 for (JobKey jobKey : jobKeys) {
794 SchedulerResponse schedulerResponse = getScheduledJob(
795 scheduler, jobKey);
796
797 if (schedulerResponse != null) {
798 schedulerResponses.add(schedulerResponse);
799 }
800 }
801
802 return schedulerResponses;
803 }
804
805 protected Scheduler getScheduler(String groupName) throws Exception {
806 if (groupName.startsWith(StorageType.PERSISTED.toString())) {
807 return _persistedScheduler;
808 }
809 else {
810 return _memoryScheduler;
811 }
812 }
813
814 protected StorageType getStorageType(String groupName) {
815 int pos = groupName.indexOf(CharPool.POUND);
816
817 String storageTypeString = groupName.substring(0, pos);
818
819 return StorageType.valueOf(storageTypeString);
820 }
821
822 protected Scheduler initializeScheduler(
823 String propertiesPrefix, boolean useQuartzCluster)
824 throws Exception {
825
826 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
827
828 Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
829
830 if (useQuartzCluster) {
831 DB db = DBFactoryUtil.getDB();
832
833 String dbType = db.getType();
834
835 if (dbType.equals(DB.TYPE_SQLSERVER)) {
836 properties.setProperty(
837 "org.quartz.jobStore.lockHandler.class",
838 UpdateLockRowSemaphore.class.getName());
839 }
840
841 if (PropsValues.CLUSTER_LINK_ENABLED) {
842 if (dbType.equals(DB.TYPE_HYPERSONIC)) {
843 _log.error("Unable to cluster scheduler on Hypersonic");
844 }
845 else {
846 properties.put(
847 "org.quartz.jobStore.isClustered",
848 Boolean.TRUE.toString());
849 }
850 }
851 }
852
853 schedulerFactory.initialize(properties);
854
855 return schedulerFactory.getScheduler();
856 }
857
858 protected void initJobState() throws Exception {
859 List<String> groupNames = _persistedScheduler.getJobGroupNames();
860
861 for (String groupName : groupNames) {
862 Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
863 GroupMatcher.jobGroupEquals(groupName));
864
865 for (JobKey jobKey : jobkeys) {
866 Trigger trigger = _persistedScheduler.getTrigger(
867 new TriggerKey(jobKey.getName(), jobKey.getGroup()));
868
869 if (trigger != null) {
870 continue;
871 }
872
873 JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
874
875 JobDataMap jobDataMap = jobDetail.getJobDataMap();
876
877 Message message = getMessage(jobDataMap);
878
879 message.put(JOB_NAME, jobKey.getName());
880 message.put(GROUP_NAME, jobKey.getGroup());
881
882 SchedulerEngineHelperUtil.auditSchedulerJobs(
883 message, TriggerState.EXPIRED);
884
885 _persistedScheduler.deleteJob(jobKey);
886 }
887 }
888 }
889
890 protected void registerMessageListeners(
891 String jobName, String groupName, String destinationName,
892 Message message)
893 throws SchedulerException {
894
895 String messageListenerClassName = message.getString(
896 MESSAGE_LISTENER_CLASS_NAME);
897
898 if (Validator.isNull(messageListenerClassName)) {
899 return;
900 }
901
902 String portletId = message.getString(PORTLET_ID);
903
904 ClassLoader classLoader = null;
905
906 if (Validator.isNull(portletId)) {
907 classLoader = PACLClassLoaderUtil.getPortalClassLoader();
908 }
909 else {
910 classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
911
912 if (classLoader == null) {
913
914
915
916
917
918 classLoader = ClassLoaderPool.getClassLoader(portletId);
919 }
920 }
921
922 if (classLoader == null) {
923 throw new SchedulerException(
924 "Unable to find class loader for portlet " + portletId);
925 }
926
927 MessageListener schedulerEventListener = getMessageListener(
928 messageListenerClassName, classLoader);
929
930 SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
931 new SchedulerEventMessageListenerWrapper();
932
933 schedulerEventListenerWrapper.setGroupName(groupName);
934 schedulerEventListenerWrapper.setJobName(jobName);
935 schedulerEventListenerWrapper.setMessageListener(
936 schedulerEventListener);
937
938 schedulerEventListenerWrapper.afterPropertiesSet();
939
940 MessageBusUtil.registerMessageListener(
941 destinationName, schedulerEventListenerWrapper);
942
943 message.put(
944 MESSAGE_LISTENER_UUID,
945 schedulerEventListenerWrapper.getMessageListenerUUID());
946
947 message.put(RECEIVER_KEY, getFullName(jobName, groupName));
948 }
949
950 protected void schedule(
951 Scheduler scheduler, StorageType storageType, Trigger trigger,
952 String description, String destinationName, Message message)
953 throws Exception {
954
955 try {
956 JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
957
958 jobBuilder.withIdentity(trigger.getJobKey());
959
960 JobDetail jobDetail = jobBuilder.build();
961
962 JobDataMap jobDataMap = jobDetail.getJobDataMap();
963
964 jobDataMap.put(DESCRIPTION, description);
965 jobDataMap.put(DESTINATION_NAME, destinationName);
966 jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
967 jobDataMap.put(STORAGE_TYPE, storageType.toString());
968
969 JobState jobState = new JobState(
970 TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
971
972 jobDataMap.put(
973 JOB_STATE, JobStateSerializeUtil.serialize(jobState));
974
975 unregisterMessageListener(scheduler, trigger.getJobKey());
976
977 synchronized (this) {
978 scheduler.deleteJob(trigger.getJobKey());
979 scheduler.scheduleJob(jobDetail, trigger);
980 }
981 }
982 catch (ObjectAlreadyExistsException oaee) {
983 if (_log.isInfoEnabled()) {
984 _log.info("Message is already scheduled");
985 }
986 }
987 }
988
989 protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
990 throws Exception {
991
992 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
993
994 if (jobDetail == null) {
995 return;
996 }
997
998 JobDataMap jobDataMap = jobDetail.getJobDataMap();
999
1000 if (jobDataMap == null) {
1001 return;
1002 }
1003
1004 Message message = getMessage(jobDataMap);
1005
1006 String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
1007
1008 if (messageListenerUUID == null) {
1009 return;
1010 }
1011
1012 String destinationName = jobDataMap.getString(DESTINATION_NAME);
1013
1014 MessageBus messageBus = MessageBusUtil.getMessageBus();
1015
1016 Destination destination = messageBus.getDestination(destinationName);
1017
1018 Set<MessageListener> messageListeners =
1019 destination.getMessageListeners();
1020
1021 for (MessageListener messageListener : messageListeners) {
1022 if (!(messageListener instanceof InvokerMessageListener)) {
1023 continue;
1024 }
1025
1026 InvokerMessageListener invokerMessageListener =
1027 (InvokerMessageListener)messageListener;
1028
1029 messageListener = invokerMessageListener.getMessageListener();
1030
1031 if (!(messageListener instanceof
1032 SchedulerEventMessageListenerWrapper)) {
1033
1034 continue;
1035 }
1036
1037 SchedulerEventMessageListenerWrapper schedulerMessageListener =
1038 (SchedulerEventMessageListenerWrapper)messageListener;
1039
1040 if (messageListenerUUID.equals(
1041 schedulerMessageListener.getMessageListenerUUID())) {
1042
1043 messageBus.unregisterMessageListener(
1044 destinationName, schedulerMessageListener);
1045
1046 return;
1047 }
1048 }
1049 }
1050
1051 protected void unschedule(Scheduler scheduler, JobKey jobKey)
1052 throws Exception {
1053
1054 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1055
1056 TriggerKey triggerKey = new TriggerKey(
1057 jobKey.getName(), jobKey.getGroup());
1058
1059 if (jobDetail == null) {
1060 return;
1061 }
1062
1063 unregisterMessageListener(scheduler, jobKey);
1064
1065 if (scheduler == _memoryScheduler) {
1066 scheduler.unscheduleJob(triggerKey);
1067
1068 return;
1069 }
1070
1071 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1072
1073 JobState jobState = getJobState(jobDataMap);
1074
1075 Trigger trigger = scheduler.getTrigger(triggerKey);
1076
1077 jobState.setTriggerDate(END_TIME, new Date());
1078 jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1079 jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1080 jobState.setTriggerDate(
1081 PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1082 jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1083
1084 jobState.setTriggerState(TriggerState.UNSCHEDULED);
1085
1086 jobState.clearExceptions();
1087
1088 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1089
1090 scheduler.unscheduleJob(triggerKey);
1091
1092 scheduler.addJob(jobDetail, true);
1093 }
1094
1095 protected void update(
1096 Scheduler scheduler,
1097 com.liferay.portal.kernel.scheduler.Trigger trigger)
1098 throws Exception {
1099
1100 Trigger quartzTrigger = getQuartzTrigger(trigger);
1101
1102 if (quartzTrigger == null) {
1103 return;
1104 }
1105
1106 TriggerKey triggerKey = quartzTrigger.getKey();
1107
1108 if (scheduler.getTrigger(triggerKey) != null) {
1109 scheduler.rescheduleJob(triggerKey, quartzTrigger);
1110 }
1111 else {
1112 JobKey jobKey = quartzTrigger.getJobKey();
1113
1114 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1115
1116 if (jobDetail == null) {
1117 return;
1118 }
1119
1120 updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1121
1122 synchronized (this) {
1123 scheduler.deleteJob(jobKey);
1124 scheduler.scheduleJob(jobDetail, quartzTrigger);
1125 }
1126 }
1127 }
1128
1129 protected void updateJobState(
1130 Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1131 boolean suppressError)
1132 throws Exception {
1133
1134 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1135
1136 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1137
1138 JobState jobState = getJobState(jobDataMap);
1139
1140 if (triggerState != null) {
1141 jobState.setTriggerState(triggerState);
1142 }
1143
1144 if (suppressError) {
1145 jobState.clearExceptions();
1146 }
1147
1148 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1149
1150 scheduler.addJob(jobDetail, true);
1151 }
1152
1153 @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1154 protected QuartzLocalService quartzLocalService;
1155
1156 private static Log _log = LogFactoryUtil.getLog(
1157 QuartzSchedulerEngine.class);
1158
1159 private Scheduler _memoryScheduler;
1160 private Scheduler _persistedScheduler;
1161
1162 }