001
014
015 package com.liferay.portal.scheduler.quartz;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019 import com.liferay.portal.kernel.dao.db.DB;
020 import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021 import com.liferay.portal.kernel.json.JSONFactoryUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Destination;
025 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026 import com.liferay.portal.kernel.messaging.Message;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageBusUtil;
029 import com.liferay.portal.kernel.messaging.MessageListener;
030 import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031 import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032 import com.liferay.portal.kernel.scheduler.JobState;
033 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
036 import com.liferay.portal.kernel.scheduler.SchedulerException;
037 import com.liferay.portal.kernel.scheduler.StorageType;
038 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039 import com.liferay.portal.kernel.scheduler.TriggerState;
040 import com.liferay.portal.kernel.scheduler.TriggerType;
041 import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.CharPool;
044 import com.liferay.portal.kernel.util.ProxyUtil;
045 import com.liferay.portal.kernel.util.ServerDetector;
046 import com.liferay.portal.kernel.util.StringPool;
047 import com.liferay.portal.kernel.util.Time;
048 import com.liferay.portal.kernel.util.Validator;
049 import com.liferay.portal.scheduler.job.MessageSenderJob;
050 import com.liferay.portal.security.pacl.PACLClassLoaderUtil;
051 import com.liferay.portal.service.QuartzLocalService;
052 import com.liferay.portal.util.PropsUtil;
053 import com.liferay.portal.util.PropsValues;
054
055 import java.text.ParseException;
056
057 import java.util.ArrayList;
058 import java.util.Collections;
059 import java.util.Date;
060 import java.util.List;
061 import java.util.Map;
062 import java.util.Properties;
063 import java.util.Set;
064
065 import org.quartz.CronScheduleBuilder;
066 import org.quartz.CronTrigger;
067 import org.quartz.JobBuilder;
068 import org.quartz.JobDataMap;
069 import org.quartz.JobDetail;
070 import org.quartz.JobKey;
071 import org.quartz.ObjectAlreadyExistsException;
072 import org.quartz.Scheduler;
073 import org.quartz.SimpleScheduleBuilder;
074 import org.quartz.SimpleTrigger;
075 import org.quartz.Trigger;
076 import org.quartz.TriggerBuilder;
077 import org.quartz.TriggerKey;
078 import org.quartz.impl.StdSchedulerFactory;
079 import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
080 import org.quartz.impl.matchers.GroupMatcher;
081
082
089 public class QuartzSchedulerEngine implements SchedulerEngine {
090
091 public void afterPropertiesSet() {
092 if (!PropsValues.SCHEDULER_ENABLED) {
093 return;
094 }
095
096 try {
097 quartzLocalService.checkQuartzTables();
098
099 _persistedScheduler = initializeScheduler(
100 "persisted.scheduler.", true);
101
102 _memoryScheduler = initializeScheduler("memory.scheduler.", false);
103 }
104 catch (Exception e) {
105 _log.error("Unable to initialize engine", e);
106 }
107 }
108
109 public void delete(String groupName) throws SchedulerException {
110 if (!PropsValues.SCHEDULER_ENABLED) {
111 return;
112 }
113
114 try {
115 Scheduler scheduler = getScheduler(groupName);
116
117 groupName = fixMaxLength(
118 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
119
120 Set<JobKey> jobKeys = scheduler.getJobKeys(
121 GroupMatcher.jobGroupEquals(groupName));
122
123 for (JobKey jobKey : jobKeys) {
124 unregisterMessageListener(scheduler, jobKey);
125
126 scheduler.deleteJob(jobKey);
127 }
128 }
129 catch (Exception e) {
130 throw new SchedulerException(
131 "Unable to delete jobs in group " + groupName, e);
132 }
133 }
134
135 public void delete(String jobName, String groupName)
136 throws SchedulerException {
137
138 if (!PropsValues.SCHEDULER_ENABLED) {
139 return;
140 }
141
142 try {
143 Scheduler scheduler = getScheduler(groupName);
144
145 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
146 groupName = fixMaxLength(
147 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
148
149 JobKey jobKey = new JobKey(jobName, groupName);
150
151 unregisterMessageListener(scheduler, jobKey);
152
153 scheduler.deleteJob(jobKey);
154 }
155 catch (Exception e) {
156 throw new SchedulerException(
157 "Unable to delete job {jobName=" + jobName + ", groupName=" +
158 groupName + "}",
159 e);
160 }
161 }
162
163 public void destroy() {
164 try {
165 shutdown();
166 }
167 catch (SchedulerException se) {
168 if (_log.isWarnEnabled()) {
169 _log.warn("Unable to shutdown", se);
170 }
171 }
172 }
173
174 public SchedulerResponse getScheduledJob(String jobName, String groupName)
175 throws SchedulerException {
176
177 if (!PropsValues.SCHEDULER_ENABLED) {
178 return null;
179 }
180
181 try {
182 Scheduler scheduler = getScheduler(groupName);
183
184 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
185 groupName = fixMaxLength(
186 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
187
188 JobKey jobKey = new JobKey(jobName, groupName);
189
190 return getScheduledJob(scheduler, jobKey);
191 }
192 catch (Exception e) {
193 throw new SchedulerException(
194 "Unable to get job {jobName=" + jobName + ", groupName=" +
195 groupName + "}",
196 e);
197 }
198 }
199
200 public List<SchedulerResponse> getScheduledJobs()
201 throws SchedulerException {
202
203 if (!PropsValues.SCHEDULER_ENABLED) {
204 return Collections.emptyList();
205 }
206
207 try {
208 List<String> groupNames = _persistedScheduler.getJobGroupNames();
209
210 List<SchedulerResponse> schedulerResponses =
211 new ArrayList<SchedulerResponse>();
212
213 for (String groupName : groupNames) {
214 schedulerResponses.addAll(
215 getScheduledJobs(_persistedScheduler, groupName));
216 }
217
218 groupNames = _memoryScheduler.getJobGroupNames();
219
220 for (String groupName : groupNames) {
221 schedulerResponses.addAll(
222 getScheduledJobs(_memoryScheduler, groupName));
223 }
224
225 return schedulerResponses;
226 }
227 catch (Exception e) {
228 throw new SchedulerException("Unable to get jobs", e);
229 }
230 }
231
232 public List<SchedulerResponse> getScheduledJobs(String groupName)
233 throws SchedulerException {
234
235 if (!PropsValues.SCHEDULER_ENABLED) {
236 return Collections.emptyList();
237 }
238
239 try {
240 Scheduler scheduler = getScheduler(groupName);
241
242 return getScheduledJobs(scheduler, groupName);
243 }
244 catch (Exception e) {
245 throw new SchedulerException(
246 "Unable to get jobs in group " + groupName, e);
247 }
248 }
249
250 public void pause(String groupName) throws SchedulerException {
251 if (!PropsValues.SCHEDULER_ENABLED) {
252 return;
253 }
254
255 try {
256 Scheduler scheduler = getScheduler(groupName);
257
258 groupName = fixMaxLength(
259 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
260
261 Set<JobKey> jobKeys = scheduler.getJobKeys(
262 GroupMatcher.jobGroupEquals(groupName));
263
264 for (JobKey jobKey : jobKeys) {
265 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
266 }
267
268 scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
269 }
270 catch (Exception e) {
271 throw new SchedulerException(
272 "Unable to pause jobs in group " + groupName, e);
273 }
274 }
275
276 public void pause(String jobName, String groupName)
277 throws SchedulerException {
278
279 if (!PropsValues.SCHEDULER_ENABLED) {
280 return;
281 }
282
283 try {
284 Scheduler scheduler = getScheduler(groupName);
285
286 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
287 groupName = fixMaxLength(
288 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
289
290 JobKey jobKey = new JobKey(jobName, groupName);
291
292 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
293
294 scheduler.pauseJob(jobKey);
295 }
296 catch (Exception e) {
297 throw new SchedulerException(
298 "Unable to pause job {jobName=" + jobName + ", groupName=" +
299 groupName + "}",
300 e);
301 }
302 }
303
304 public void resume(String groupName) throws SchedulerException {
305 if (!PropsValues.SCHEDULER_ENABLED) {
306 return;
307 }
308
309 try {
310 Scheduler scheduler = getScheduler(groupName);
311
312 groupName = fixMaxLength(
313 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
314
315 Set<JobKey> jobKeys = scheduler.getJobKeys(
316 GroupMatcher.jobGroupEquals(groupName));
317
318 for (JobKey jobKey : jobKeys) {
319 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
320 }
321
322 scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
323 }
324 catch (Exception e) {
325 throw new SchedulerException(
326 "Unable to resume jobs in group " + groupName, e);
327 }
328 }
329
330 public void resume(String jobName, String groupName)
331 throws SchedulerException {
332
333 if (!PropsValues.SCHEDULER_ENABLED) {
334 return;
335 }
336
337 try {
338 Scheduler scheduler = getScheduler(groupName);
339
340 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
341 groupName = fixMaxLength(
342 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
343
344 JobKey jobKey = new JobKey(jobName, groupName);
345
346 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
347
348 scheduler.resumeJob(jobKey);
349 }
350 catch (Exception e) {
351 throw new SchedulerException(
352 "Unable to resume job {jobName=" + jobName + ", groupName=" +
353 groupName + "}",
354 e);
355 }
356 }
357
358 public void schedule(
359 com.liferay.portal.kernel.scheduler.Trigger trigger,
360 String description, String destination, Message message)
361 throws SchedulerException {
362
363 if (!PropsValues.SCHEDULER_ENABLED) {
364 return;
365 }
366
367 try {
368 Scheduler scheduler = getScheduler(trigger.getGroupName());
369
370 StorageType storageType = getStorageType(trigger.getGroupName());
371
372 trigger = TriggerFactoryUtil.buildTrigger(
373 trigger.getTriggerType(), trigger.getJobName(),
374 getOriginalGroupName(trigger.getGroupName()),
375 trigger.getStartDate(), trigger.getEndDate(),
376 trigger.getTriggerContent());
377
378 Trigger quartzTrigger = getQuartzTrigger(trigger);
379
380 if (quartzTrigger == null) {
381 return;
382 }
383
384 description = fixMaxLength(description, DESCRIPTION_MAX_LENGTH);
385
386 if (message == null) {
387 message = new Message();
388 }
389 else {
390 message = message.clone();
391 }
392
393 registerMessageListeners(
394 trigger.getJobName(), trigger.getGroupName(), destination,
395 message);
396
397 schedule(
398 scheduler, storageType, quartzTrigger, description, destination,
399 message);
400 }
401 catch (RuntimeException re) {
402
403
404
405
406 }
407 catch (Exception e) {
408 throw new SchedulerException("Unable to schedule job", e);
409 }
410 }
411
412 public void shutdown() throws SchedulerException {
413 if (!PropsValues.SCHEDULER_ENABLED) {
414 return;
415 }
416
417 try {
418 if (!_persistedScheduler.isShutdown()) {
419 _persistedScheduler.shutdown(false);
420 }
421
422 if (!_memoryScheduler.isShutdown()) {
423 _memoryScheduler.shutdown(false);
424 }
425 }
426 catch (Exception e) {
427 throw new SchedulerException("Unable to shutdown scheduler", e);
428 }
429 }
430
431 public void start() throws SchedulerException {
432 if (!PropsValues.SCHEDULER_ENABLED) {
433 return;
434 }
435
436 try {
437 _persistedScheduler.start();
438
439 initJobState();
440
441 _memoryScheduler.start();
442 }
443 catch (Exception e) {
444 throw new SchedulerException("Unable to start scheduler", e);
445 }
446 }
447
448 public void suppressError(String jobName, String groupName)
449 throws SchedulerException {
450
451 if (!PropsValues.SCHEDULER_ENABLED) {
452 return;
453 }
454
455 try {
456 Scheduler scheduler = getScheduler(groupName);
457
458 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
459 groupName = fixMaxLength(
460 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
461
462 JobKey jobKey = new JobKey(jobName, groupName);
463
464 updateJobState(scheduler, jobKey, null, true);
465 }
466 catch (Exception e) {
467 throw new SchedulerException(
468 "Unable to suppress error for job {jobName=" + jobName +
469 ", groupName=" + groupName + "}",
470 e);
471 }
472 }
473
474 public void unschedule(String groupName) throws SchedulerException {
475 if (!PropsValues.SCHEDULER_ENABLED) {
476 return;
477 }
478
479 try {
480 Scheduler scheduler = getScheduler(groupName);
481
482 groupName = fixMaxLength(
483 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
484
485 Set<JobKey> jobKeys = scheduler.getJobKeys(
486 GroupMatcher.jobGroupEquals(groupName));
487
488 for (JobKey jobKey : jobKeys) {
489 unschedule(scheduler, jobKey);
490 }
491 }
492 catch (Exception e) {
493 throw new SchedulerException(
494 "Unable to unschedule jobs in group " + groupName, e);
495 }
496 }
497
498 public void unschedule(String jobName, String groupName)
499 throws SchedulerException {
500
501 if (!PropsValues.SCHEDULER_ENABLED) {
502 return;
503 }
504
505 try {
506 Scheduler scheduler = getScheduler(groupName);
507
508 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
509 groupName = fixMaxLength(
510 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
511
512 JobKey jobKey = new JobKey(jobName, groupName);
513
514 unschedule(scheduler, jobKey);
515 }
516 catch (Exception e) {
517 throw new SchedulerException(
518 "Unable to unschedule job {jobName=" + jobName +
519 ", groupName=" + groupName + "}",
520 e);
521 }
522 }
523
524 public void update(com.liferay.portal.kernel.scheduler.Trigger trigger)
525 throws SchedulerException {
526
527 if (!PropsValues.SCHEDULER_ENABLED) {
528 return;
529 }
530
531 try {
532 Scheduler scheduler = getScheduler(trigger.getGroupName());
533
534 trigger = TriggerFactoryUtil.buildTrigger(
535 trigger.getTriggerType(), trigger.getJobName(),
536 getOriginalGroupName(trigger.getGroupName()),
537 trigger.getStartDate(), trigger.getEndDate(),
538 trigger.getTriggerContent());
539
540 update(scheduler, trigger);
541 }
542 catch (Exception e) {
543 throw new SchedulerException("Unable to update trigger", e);
544 }
545 }
546
547 protected String fixMaxLength(String argument, int maxLength) {
548 if (argument == null) {
549 return null;
550 }
551
552 if (argument.length() > maxLength) {
553 argument = argument.substring(0, maxLength);
554 }
555
556 return argument;
557 }
558
559 protected String getFullName(String jobName, String groupName) {
560 return groupName.concat(StringPool.PERIOD).concat(jobName);
561 }
562
563 protected JobState getJobState(JobDataMap jobDataMap) {
564 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
565 JOB_STATE);
566
567 return JobStateSerializeUtil.deserialize(jobStateMap);
568 }
569
570 protected Message getMessage(JobDataMap jobDataMap) {
571 String messageJSON = (String)jobDataMap.get(MESSAGE);
572
573 return (Message)JSONFactoryUtil.deserialize(messageJSON);
574 }
575
576 protected MessageListener getMessageListener(
577 String messageListenerClassName, ClassLoader classLoader)
578 throws SchedulerException {
579
580 MessageListener schedulerEventListener = null;
581
582 try {
583 Class<? extends MessageListener> clazz =
584 (Class<? extends MessageListener>)classLoader.loadClass(
585 messageListenerClassName);
586
587 schedulerEventListener = clazz.newInstance();
588
589 schedulerEventListener =
590 (MessageListener)ProxyUtil.newProxyInstance(
591 classLoader, new Class<?>[] {MessageListener.class},
592 new ClassLoaderBeanHandler(
593 schedulerEventListener, classLoader));
594 }
595 catch (Exception e) {
596 throw new SchedulerException(
597 "Unable to register message listener with name " +
598 messageListenerClassName,
599 e);
600 }
601
602 return schedulerEventListener;
603 }
604
605 protected String getOriginalGroupName(String groupName) {
606 int pos = groupName.indexOf(CharPool.POUND);
607
608 return groupName.substring(pos + 1);
609 }
610
611 protected Trigger getQuartzTrigger(
612 com.liferay.portal.kernel.scheduler.Trigger trigger)
613 throws SchedulerException {
614
615 if (trigger == null) {
616 return null;
617 }
618
619 Date endDate = trigger.getEndDate();
620 String jobName = fixMaxLength(
621 trigger.getJobName(), JOB_NAME_MAX_LENGTH);
622 String groupName = fixMaxLength(
623 trigger.getGroupName(), GROUP_NAME_MAX_LENGTH);
624
625 Date startDate = trigger.getStartDate();
626
627 if (startDate == null) {
628 if (ServerDetector.isTomcat()) {
629 startDate = new Date(System.currentTimeMillis() + Time.MINUTE);
630 }
631 else {
632 startDate = new Date(
633 System.currentTimeMillis() + Time.MINUTE * 3);
634 }
635 }
636
637 Trigger quartzTrigger = null;
638
639 TriggerType triggerType = trigger.getTriggerType();
640
641 if (triggerType.equals(TriggerType.CRON)) {
642 try {
643 TriggerBuilder<Trigger>triggerBuilder =
644 TriggerBuilder.newTrigger();
645
646 triggerBuilder.endAt(endDate);
647 triggerBuilder.forJob(jobName, groupName);
648 triggerBuilder.startAt(startDate);
649 triggerBuilder.withIdentity(jobName, groupName);
650
651 CronScheduleBuilder cronScheduleBuilder =
652 CronScheduleBuilder.cronSchedule(
653 (String)trigger.getTriggerContent());
654
655 triggerBuilder.withSchedule(cronScheduleBuilder);
656
657 quartzTrigger = triggerBuilder.build();
658 }
659 catch (ParseException pe) {
660 throw new SchedulerException(
661 "Unable to parse cron text " + trigger.getTriggerContent());
662 }
663 }
664 else if (triggerType.equals(TriggerType.SIMPLE)) {
665 long interval = (Long)trigger.getTriggerContent();
666
667 if (interval <= 0) {
668 if (_log.isDebugEnabled()) {
669 _log.debug(
670 "Not scheduling " + trigger.getJobName() +
671 " because interval is less than or equal to 0");
672 }
673
674 return null;
675 }
676
677 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
678
679 triggerBuilder.endAt(endDate);
680 triggerBuilder.forJob(jobName, groupName);
681 triggerBuilder.startAt(startDate);
682 triggerBuilder.withIdentity(jobName, groupName);
683
684 SimpleScheduleBuilder simpleScheduleBuilder =
685 SimpleScheduleBuilder.simpleSchedule();
686
687 simpleScheduleBuilder.withIntervalInMilliseconds(interval);
688 simpleScheduleBuilder.withRepeatCount(
689 SimpleTrigger.REPEAT_INDEFINITELY);
690
691 triggerBuilder.withSchedule(simpleScheduleBuilder);
692
693 quartzTrigger = triggerBuilder.build();
694 }
695 else {
696 throw new SchedulerException(
697 "Unknown trigger type " + trigger.getTriggerType());
698 }
699
700 return quartzTrigger;
701 }
702
703 protected SchedulerResponse getScheduledJob(
704 Scheduler scheduler, JobKey jobKey)
705 throws Exception {
706
707 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
708
709 if (jobDetail == null) {
710 return null;
711 }
712
713 JobDataMap jobDataMap = jobDetail.getJobDataMap();
714
715 String description = jobDataMap.getString(DESCRIPTION);
716 String destinationName = jobDataMap.getString(DESTINATION_NAME);
717 Message message = getMessage(jobDataMap);
718 StorageType storageType = StorageType.valueOf(
719 jobDataMap.getString(STORAGE_TYPE));
720
721 SchedulerResponse schedulerResponse = null;
722
723 String jobName = jobKey.getName();
724 String groupName = jobKey.getGroup();
725
726 TriggerKey triggerKey = new TriggerKey(jobName, groupName);
727
728 Trigger trigger = scheduler.getTrigger(triggerKey);
729
730 JobState jobState = getJobState(jobDataMap);
731
732 message.put(JOB_STATE, jobState);
733
734 if (trigger == null) {
735 schedulerResponse = new SchedulerResponse();
736
737 schedulerResponse.setDescription(description);
738 schedulerResponse.setDestinationName(destinationName);
739 schedulerResponse.setGroupName(groupName);
740 schedulerResponse.setJobName(jobName);
741 schedulerResponse.setMessage(message);
742 schedulerResponse.setStorageType(storageType);
743 }
744 else {
745 message.put(END_TIME, trigger.getEndTime());
746 message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
747 message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
748 message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
749 message.put(START_TIME, trigger.getStartTime());
750
751 if (CronTrigger.class.isAssignableFrom(trigger.getClass())) {
752
753 CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
754
755 schedulerResponse = new SchedulerResponse();
756
757 schedulerResponse.setDescription(description);
758 schedulerResponse.setDestinationName(destinationName);
759 schedulerResponse.setMessage(message);
760 schedulerResponse.setStorageType(storageType);
761 schedulerResponse.setTrigger(
762 new com.liferay.portal.kernel.scheduler.CronTrigger(
763 jobName, groupName, cronTrigger.getStartTime(),
764 cronTrigger.getEndTime(),
765 cronTrigger.getCronExpression()));
766 }
767 else if (SimpleTrigger.class.isAssignableFrom(trigger.getClass())) {
768 SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
769
770 schedulerResponse = new SchedulerResponse();
771
772 schedulerResponse.setDescription(description);
773 schedulerResponse.setDestinationName(destinationName);
774 schedulerResponse.setMessage(message);
775 schedulerResponse.setStorageType(storageType);
776 schedulerResponse.setTrigger(
777 new IntervalTrigger(
778 jobName, groupName, simpleTrigger.getStartTime(),
779 simpleTrigger.getEndTime(),
780 simpleTrigger.getRepeatInterval()));
781 }
782 }
783
784 return schedulerResponse;
785 }
786
787 protected List<SchedulerResponse> getScheduledJobs(
788 Scheduler scheduler, String groupName)
789 throws Exception {
790
791 groupName = fixMaxLength(
792 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
793
794 List<SchedulerResponse> schedulerResponses =
795 new ArrayList<SchedulerResponse>();
796
797 Set<JobKey> jobKeys = scheduler.getJobKeys(
798 GroupMatcher.jobGroupEquals(groupName));
799
800 for (JobKey jobKey : jobKeys) {
801 SchedulerResponse schedulerResponse = getScheduledJob(
802 scheduler, jobKey);
803
804 if (schedulerResponse != null) {
805 schedulerResponses.add(schedulerResponse);
806 }
807 }
808
809 return schedulerResponses;
810 }
811
812 protected Scheduler getScheduler(String groupName) throws Exception {
813 if (groupName.startsWith(StorageType.PERSISTED.toString())) {
814 return _persistedScheduler;
815 }
816 else {
817 return _memoryScheduler;
818 }
819 }
820
821 protected StorageType getStorageType(String groupName) {
822 int pos = groupName.indexOf(CharPool.POUND);
823
824 String storageTypeString = groupName.substring(0, pos);
825
826 return StorageType.valueOf(storageTypeString);
827 }
828
829 protected Scheduler initializeScheduler(
830 String propertiesPrefix, boolean useQuartzCluster)
831 throws Exception {
832
833 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
834
835 Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
836
837 if (useQuartzCluster) {
838 DB db = DBFactoryUtil.getDB();
839
840 String dbType = db.getType();
841
842 if (dbType.equals(DB.TYPE_SQLSERVER)) {
843 properties.setProperty(
844 "org.quartz.jobStore.lockHandler.class",
845 UpdateLockRowSemaphore.class.getName());
846 }
847
848 if (PropsValues.CLUSTER_LINK_ENABLED) {
849 if (dbType.equals(DB.TYPE_HYPERSONIC)) {
850 _log.error("Unable to cluster scheduler on Hypersonic");
851 }
852 else {
853 properties.put(
854 "org.quartz.jobStore.isClustered",
855 Boolean.TRUE.toString());
856 }
857 }
858 }
859
860 schedulerFactory.initialize(properties);
861
862 return schedulerFactory.getScheduler();
863 }
864
865 protected void initJobState() throws Exception {
866 List<String> groupNames = _persistedScheduler.getJobGroupNames();
867
868 for (String groupName : groupNames) {
869 Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
870 GroupMatcher.jobGroupEquals(groupName));
871
872 for (JobKey jobKey : jobkeys) {
873 Trigger trigger = _persistedScheduler.getTrigger(
874 new TriggerKey(jobKey.getName(), jobKey.getGroup()));
875
876 if (trigger != null) {
877 continue;
878 }
879
880 JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
881
882 JobDataMap jobDataMap = jobDetail.getJobDataMap();
883
884 Message message = getMessage(jobDataMap);
885
886 message.put(JOB_NAME, jobKey.getName());
887 message.put(GROUP_NAME, jobKey.getGroup());
888
889 SchedulerEngineUtil.auditSchedulerJobs(
890 message, TriggerState.EXPIRED);
891
892 _persistedScheduler.deleteJob(jobKey);
893 }
894 }
895 }
896
897 protected void registerMessageListeners(
898 String jobName, String groupName, String destinationName,
899 Message message)
900 throws SchedulerException {
901
902 String messageListenerClassName = message.getString(
903 MESSAGE_LISTENER_CLASS_NAME);
904
905 if (Validator.isNull(messageListenerClassName)) {
906 return;
907 }
908
909 String portletId = message.getString(PORTLET_ID);
910
911 ClassLoader classLoader = null;
912
913 if (Validator.isNull(portletId)) {
914 classLoader = PACLClassLoaderUtil.getPortalClassLoader();
915 }
916 else {
917 classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
918 }
919
920 if (classLoader == null) {
921 throw new SchedulerException(
922 "Unable to find class loader for portlet " + portletId);
923 }
924
925 MessageListener schedulerEventListener = getMessageListener(
926 messageListenerClassName, classLoader);
927
928 SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
929 new SchedulerEventMessageListenerWrapper();
930
931 schedulerEventListenerWrapper.setGroupName(groupName);
932 schedulerEventListenerWrapper.setJobName(jobName);
933 schedulerEventListenerWrapper.setMessageListener(
934 schedulerEventListener);
935
936 schedulerEventListenerWrapper.afterPropertiesSet();
937
938 MessageBusUtil.registerMessageListener(
939 destinationName, schedulerEventListenerWrapper);
940
941 message.put(
942 MESSAGE_LISTENER_UUID,
943 schedulerEventListenerWrapper.getMessageListenerUUID());
944
945 message.put(RECEIVER_KEY, getFullName(jobName, groupName));
946 }
947
948 protected void schedule(
949 Scheduler scheduler, StorageType storageType, Trigger trigger,
950 String description, String destinationName, Message message)
951 throws Exception {
952
953 try {
954 JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
955
956 jobBuilder.withIdentity(trigger.getJobKey());
957
958 JobDetail jobDetail = jobBuilder.build();
959
960 JobDataMap jobDataMap = jobDetail.getJobDataMap();
961
962 jobDataMap.put(DESCRIPTION, description);
963 jobDataMap.put(DESTINATION_NAME, destinationName);
964 jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
965 jobDataMap.put(STORAGE_TYPE, storageType.toString());
966
967 JobState jobState = new JobState(
968 TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
969
970 jobDataMap.put(
971 JOB_STATE, JobStateSerializeUtil.serialize(jobState));
972
973 unregisterMessageListener(scheduler, trigger.getJobKey());
974
975 synchronized (this) {
976 scheduler.deleteJob(trigger.getJobKey());
977 scheduler.scheduleJob(jobDetail, trigger);
978 }
979 }
980 catch (ObjectAlreadyExistsException oaee) {
981 if (_log.isInfoEnabled()) {
982 _log.info("Message is already scheduled");
983 }
984 }
985 }
986
987 protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
988 throws Exception {
989
990 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
991
992 if (jobDetail == null) {
993 return;
994 }
995
996 JobDataMap jobDataMap = jobDetail.getJobDataMap();
997
998 if (jobDataMap == null) {
999 return;
1000 }
1001
1002 Message message = getMessage(jobDataMap);
1003
1004 String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
1005
1006 if (messageListenerUUID == null) {
1007 return;
1008 }
1009
1010 String destinationName = jobDataMap.getString(DESTINATION_NAME);
1011
1012 MessageBus messageBus = MessageBusUtil.getMessageBus();
1013
1014 Destination destination = messageBus.getDestination(destinationName);
1015
1016 Set<MessageListener> messageListeners =
1017 destination.getMessageListeners();
1018
1019 for (MessageListener messageListener : messageListeners) {
1020 if (!(messageListener instanceof InvokerMessageListener)) {
1021 continue;
1022 }
1023
1024 InvokerMessageListener invokerMessageListener =
1025 (InvokerMessageListener)messageListener;
1026
1027 messageListener = invokerMessageListener.getMessageListener();
1028
1029 if (!(messageListener instanceof
1030 SchedulerEventMessageListenerWrapper)) {
1031
1032 continue;
1033 }
1034
1035 SchedulerEventMessageListenerWrapper schedulerMessageListener =
1036 (SchedulerEventMessageListenerWrapper)messageListener;
1037
1038 if (messageListenerUUID.equals(
1039 schedulerMessageListener.getMessageListenerUUID())) {
1040
1041 messageBus.unregisterMessageListener(
1042 destinationName, schedulerMessageListener);
1043
1044 return;
1045 }
1046 }
1047 }
1048
1049 protected void unschedule(Scheduler scheduler, JobKey jobKey)
1050 throws Exception {
1051
1052 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1053
1054 TriggerKey triggerKey = new TriggerKey(
1055 jobKey.getName(), jobKey.getGroup());
1056
1057 if (jobDetail == null) {
1058 return;
1059 }
1060
1061 unregisterMessageListener(scheduler, jobKey);
1062
1063 if (scheduler == _memoryScheduler) {
1064 scheduler.unscheduleJob(triggerKey);
1065
1066 return;
1067 }
1068
1069 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1070
1071 JobState jobState = getJobState(jobDataMap);
1072
1073 Trigger trigger = scheduler.getTrigger(triggerKey);
1074
1075 jobState.setTriggerDate(END_TIME, new Date());
1076 jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1077 jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1078 jobState.setTriggerDate(
1079 PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1080 jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1081
1082 jobState.setTriggerState(TriggerState.UNSCHEDULED);
1083
1084 jobState.clearExceptions();
1085
1086 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1087
1088 scheduler.unscheduleJob(triggerKey);
1089
1090 scheduler.addJob(jobDetail, true);
1091 }
1092
1093 protected void update(
1094 Scheduler scheduler,
1095 com.liferay.portal.kernel.scheduler.Trigger trigger)
1096 throws Exception {
1097
1098 Trigger quartzTrigger = getQuartzTrigger(trigger);
1099
1100 if (quartzTrigger == null) {
1101 return;
1102 }
1103
1104 TriggerKey triggerKey = quartzTrigger.getKey();
1105
1106 if (scheduler.getTrigger(triggerKey) != null) {
1107 scheduler.rescheduleJob(triggerKey, quartzTrigger);
1108 }
1109 else {
1110 JobKey jobKey = quartzTrigger.getJobKey();
1111
1112 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1113
1114 if (jobDetail == null) {
1115 return;
1116 }
1117
1118 updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1119
1120 synchronized (this) {
1121 scheduler.deleteJob(jobKey);
1122 scheduler.scheduleJob(jobDetail, quartzTrigger);
1123 }
1124 }
1125 }
1126
1127 protected void updateJobState(
1128 Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1129 boolean suppressError)
1130 throws Exception {
1131
1132 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1133
1134 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1135
1136 JobState jobState = getJobState(jobDataMap);
1137
1138 if (triggerState != null) {
1139 jobState.setTriggerState(triggerState);
1140 }
1141
1142 if (suppressError) {
1143 jobState.clearExceptions();
1144 }
1145
1146 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1147
1148 scheduler.addJob(jobDetail, true);
1149 }
1150
1151 @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1152 protected QuartzLocalService quartzLocalService;
1153
1154 private static Log _log = LogFactoryUtil.getLog(
1155 QuartzSchedulerEngine.class);
1156
1157 private Scheduler _memoryScheduler;
1158 private Scheduler _persistedScheduler;
1159
1160 }