001
014
015 package com.liferay.portal.scheduler.quartz;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019 import com.liferay.portal.kernel.dao.db.DB;
020 import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021 import com.liferay.portal.kernel.json.JSONFactoryUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Destination;
025 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026 import com.liferay.portal.kernel.messaging.Message;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageBusUtil;
029 import com.liferay.portal.kernel.messaging.MessageListener;
030 import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031 import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032 import com.liferay.portal.kernel.scheduler.JobState;
033 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036 import com.liferay.portal.kernel.scheduler.SchedulerException;
037 import com.liferay.portal.kernel.scheduler.StorageType;
038 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039 import com.liferay.portal.kernel.scheduler.TriggerState;
040 import com.liferay.portal.kernel.scheduler.TriggerType;
041 import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.CharPool;
044 import com.liferay.portal.kernel.util.ClassLoaderPool;
045 import com.liferay.portal.kernel.util.ProxyUtil;
046 import com.liferay.portal.kernel.util.StringPool;
047 import com.liferay.portal.kernel.util.Validator;
048 import com.liferay.portal.scheduler.job.MessageSenderJob;
049 import com.liferay.portal.service.QuartzLocalService;
050 import com.liferay.portal.util.ClassLoaderUtil;
051 import com.liferay.portal.util.PropsUtil;
052 import com.liferay.portal.util.PropsValues;
053
054 import java.util.ArrayList;
055 import java.util.Collections;
056 import java.util.Date;
057 import java.util.List;
058 import java.util.Map;
059 import java.util.Properties;
060 import java.util.Set;
061
062 import org.quartz.CronScheduleBuilder;
063 import org.quartz.CronTrigger;
064 import org.quartz.JobBuilder;
065 import org.quartz.JobDataMap;
066 import org.quartz.JobDetail;
067 import org.quartz.JobKey;
068 import org.quartz.ObjectAlreadyExistsException;
069 import org.quartz.Scheduler;
070 import org.quartz.SimpleScheduleBuilder;
071 import org.quartz.SimpleTrigger;
072 import org.quartz.Trigger;
073 import org.quartz.TriggerBuilder;
074 import org.quartz.TriggerKey;
075 import org.quartz.impl.StdSchedulerFactory;
076 import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
077 import org.quartz.impl.matchers.GroupMatcher;
078
079
086 public class QuartzSchedulerEngine implements SchedulerEngine {
087
088 public void afterPropertiesSet() {
089 if (!PropsValues.SCHEDULER_ENABLED) {
090 return;
091 }
092
093 try {
094 quartzLocalService.checkQuartzTables();
095
096 _persistedScheduler = initializeScheduler(
097 "persisted.scheduler.", true);
098
099 _memoryScheduler = initializeScheduler("memory.scheduler.", false);
100 }
101 catch (Exception e) {
102 _log.error("Unable to initialize engine", e);
103 }
104 }
105
106 public void delete(String groupName) throws SchedulerException {
107 if (!PropsValues.SCHEDULER_ENABLED) {
108 return;
109 }
110
111 try {
112 Scheduler scheduler = getScheduler(groupName);
113
114 groupName = fixMaxLength(
115 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
116
117 Set<JobKey> jobKeys = scheduler.getJobKeys(
118 GroupMatcher.jobGroupEquals(groupName));
119
120 for (JobKey jobKey : jobKeys) {
121 unregisterMessageListener(scheduler, jobKey);
122
123 scheduler.deleteJob(jobKey);
124 }
125 }
126 catch (Exception e) {
127 throw new SchedulerException(
128 "Unable to delete jobs in group " + groupName, e);
129 }
130 }
131
132 public void delete(String jobName, String groupName)
133 throws SchedulerException {
134
135 if (!PropsValues.SCHEDULER_ENABLED) {
136 return;
137 }
138
139 try {
140 Scheduler scheduler = getScheduler(groupName);
141
142 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
143 groupName = fixMaxLength(
144 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
145
146 JobKey jobKey = new JobKey(jobName, groupName);
147
148 unregisterMessageListener(scheduler, jobKey);
149
150 scheduler.deleteJob(jobKey);
151 }
152 catch (Exception e) {
153 throw new SchedulerException(
154 "Unable to delete job {jobName=" + jobName + ", groupName=" +
155 groupName + "}",
156 e);
157 }
158 }
159
160 public void destroy() {
161 try {
162 shutdown();
163 }
164 catch (SchedulerException se) {
165 if (_log.isWarnEnabled()) {
166 _log.warn("Unable to shutdown", se);
167 }
168 }
169 }
170
171 public SchedulerResponse getScheduledJob(String jobName, String groupName)
172 throws SchedulerException {
173
174 if (!PropsValues.SCHEDULER_ENABLED) {
175 return null;
176 }
177
178 try {
179 Scheduler scheduler = getScheduler(groupName);
180
181 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
182 groupName = fixMaxLength(
183 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
184
185 JobKey jobKey = new JobKey(jobName, groupName);
186
187 return getScheduledJob(scheduler, jobKey);
188 }
189 catch (Exception e) {
190 throw new SchedulerException(
191 "Unable to get job {jobName=" + jobName + ", groupName=" +
192 groupName + "}",
193 e);
194 }
195 }
196
197 public List<SchedulerResponse> getScheduledJobs()
198 throws SchedulerException {
199
200 if (!PropsValues.SCHEDULER_ENABLED) {
201 return Collections.emptyList();
202 }
203
204 try {
205 List<String> groupNames = _persistedScheduler.getJobGroupNames();
206
207 List<SchedulerResponse> schedulerResponses =
208 new ArrayList<SchedulerResponse>();
209
210 for (String groupName : groupNames) {
211 schedulerResponses.addAll(
212 getScheduledJobs(_persistedScheduler, groupName));
213 }
214
215 groupNames = _memoryScheduler.getJobGroupNames();
216
217 for (String groupName : groupNames) {
218 schedulerResponses.addAll(
219 getScheduledJobs(_memoryScheduler, groupName));
220 }
221
222 return schedulerResponses;
223 }
224 catch (Exception e) {
225 throw new SchedulerException("Unable to get jobs", e);
226 }
227 }
228
229 public List<SchedulerResponse> getScheduledJobs(String groupName)
230 throws SchedulerException {
231
232 if (!PropsValues.SCHEDULER_ENABLED) {
233 return Collections.emptyList();
234 }
235
236 try {
237 Scheduler scheduler = getScheduler(groupName);
238
239 return getScheduledJobs(scheduler, groupName);
240 }
241 catch (Exception e) {
242 throw new SchedulerException(
243 "Unable to get jobs in group " + groupName, e);
244 }
245 }
246
247 public void pause(String groupName) throws SchedulerException {
248 if (!PropsValues.SCHEDULER_ENABLED) {
249 return;
250 }
251
252 try {
253 Scheduler scheduler = getScheduler(groupName);
254
255 groupName = fixMaxLength(
256 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
257
258 Set<JobKey> jobKeys = scheduler.getJobKeys(
259 GroupMatcher.jobGroupEquals(groupName));
260
261 for (JobKey jobKey : jobKeys) {
262 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
263 }
264
265 scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
266 }
267 catch (Exception e) {
268 throw new SchedulerException(
269 "Unable to pause jobs in group " + groupName, e);
270 }
271 }
272
273 public void pause(String jobName, String groupName)
274 throws SchedulerException {
275
276 if (!PropsValues.SCHEDULER_ENABLED) {
277 return;
278 }
279
280 try {
281 Scheduler scheduler = getScheduler(groupName);
282
283 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
284 groupName = fixMaxLength(
285 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
286
287 JobKey jobKey = new JobKey(jobName, groupName);
288
289 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
290
291 scheduler.pauseJob(jobKey);
292 }
293 catch (Exception e) {
294 throw new SchedulerException(
295 "Unable to pause job {jobName=" + jobName + ", groupName=" +
296 groupName + "}",
297 e);
298 }
299 }
300
301 public void resume(String groupName) throws SchedulerException {
302 if (!PropsValues.SCHEDULER_ENABLED) {
303 return;
304 }
305
306 try {
307 Scheduler scheduler = getScheduler(groupName);
308
309 groupName = fixMaxLength(
310 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
311
312 Set<JobKey> jobKeys = scheduler.getJobKeys(
313 GroupMatcher.jobGroupEquals(groupName));
314
315 for (JobKey jobKey : jobKeys) {
316 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
317 }
318
319 scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
320 }
321 catch (Exception e) {
322 throw new SchedulerException(
323 "Unable to resume jobs in group " + groupName, e);
324 }
325 }
326
327 public void resume(String jobName, String groupName)
328 throws SchedulerException {
329
330 if (!PropsValues.SCHEDULER_ENABLED) {
331 return;
332 }
333
334 try {
335 Scheduler scheduler = getScheduler(groupName);
336
337 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
338 groupName = fixMaxLength(
339 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
340
341 JobKey jobKey = new JobKey(jobName, groupName);
342
343 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
344
345 scheduler.resumeJob(jobKey);
346 }
347 catch (Exception e) {
348 throw new SchedulerException(
349 "Unable to resume job {jobName=" + jobName + ", groupName=" +
350 groupName + "}",
351 e);
352 }
353 }
354
355 public void schedule(
356 com.liferay.portal.kernel.scheduler.Trigger trigger,
357 String description, String destination, Message message)
358 throws SchedulerException {
359
360 if (!PropsValues.SCHEDULER_ENABLED) {
361 return;
362 }
363
364 try {
365 Scheduler scheduler = getScheduler(trigger.getGroupName());
366
367 StorageType storageType = getStorageType(trigger.getGroupName());
368
369 trigger = TriggerFactoryUtil.buildTrigger(
370 trigger.getTriggerType(), trigger.getJobName(),
371 getOriginalGroupName(trigger.getGroupName()),
372 trigger.getStartDate(), trigger.getEndDate(),
373 trigger.getTriggerContent());
374
375 Trigger quartzTrigger = getQuartzTrigger(trigger);
376
377 if (quartzTrigger == null) {
378 return;
379 }
380
381 description = fixMaxLength(description, DESCRIPTION_MAX_LENGTH);
382
383 if (message == null) {
384 message = new Message();
385 }
386 else {
387 message = message.clone();
388 }
389
390 registerMessageListeners(
391 trigger.getJobName(), trigger.getGroupName(), destination,
392 message);
393
394 schedule(
395 scheduler, storageType, quartzTrigger, description, destination,
396 message);
397 }
398 catch (RuntimeException re) {
399
400
401
402
403 }
404 catch (Exception e) {
405 throw new SchedulerException("Unable to schedule job", e);
406 }
407 }
408
409 public void shutdown() throws SchedulerException {
410 if (!PropsValues.SCHEDULER_ENABLED) {
411 return;
412 }
413
414 try {
415 if (!_persistedScheduler.isShutdown()) {
416 _persistedScheduler.shutdown(false);
417 }
418
419 if (!_memoryScheduler.isShutdown()) {
420 _memoryScheduler.shutdown(false);
421 }
422 }
423 catch (Exception e) {
424 throw new SchedulerException("Unable to shutdown scheduler", e);
425 }
426 }
427
428 public void start() throws SchedulerException {
429 if (!PropsValues.SCHEDULER_ENABLED) {
430 return;
431 }
432
433 try {
434 _persistedScheduler.start();
435
436 initJobState();
437
438 _memoryScheduler.start();
439 }
440 catch (Exception e) {
441 throw new SchedulerException("Unable to start scheduler", e);
442 }
443 }
444
445 public void suppressError(String jobName, String groupName)
446 throws SchedulerException {
447
448 if (!PropsValues.SCHEDULER_ENABLED) {
449 return;
450 }
451
452 try {
453 Scheduler scheduler = getScheduler(groupName);
454
455 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
456 groupName = fixMaxLength(
457 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
458
459 JobKey jobKey = new JobKey(jobName, groupName);
460
461 updateJobState(scheduler, jobKey, null, true);
462 }
463 catch (Exception e) {
464 throw new SchedulerException(
465 "Unable to suppress error for job {jobName=" + jobName +
466 ", groupName=" + groupName + "}",
467 e);
468 }
469 }
470
471 public void unschedule(String groupName) throws SchedulerException {
472 if (!PropsValues.SCHEDULER_ENABLED) {
473 return;
474 }
475
476 try {
477 Scheduler scheduler = getScheduler(groupName);
478
479 groupName = fixMaxLength(
480 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
481
482 Set<JobKey> jobKeys = scheduler.getJobKeys(
483 GroupMatcher.jobGroupEquals(groupName));
484
485 for (JobKey jobKey : jobKeys) {
486 unschedule(scheduler, jobKey);
487 }
488 }
489 catch (Exception e) {
490 throw new SchedulerException(
491 "Unable to unschedule jobs in group " + groupName, e);
492 }
493 }
494
495 public void unschedule(String jobName, String groupName)
496 throws SchedulerException {
497
498 if (!PropsValues.SCHEDULER_ENABLED) {
499 return;
500 }
501
502 try {
503 Scheduler scheduler = getScheduler(groupName);
504
505 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
506 groupName = fixMaxLength(
507 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
508
509 JobKey jobKey = new JobKey(jobName, groupName);
510
511 unschedule(scheduler, jobKey);
512 }
513 catch (Exception e) {
514 throw new SchedulerException(
515 "Unable to unschedule job {jobName=" + jobName +
516 ", groupName=" + groupName + "}",
517 e);
518 }
519 }
520
521 public void update(com.liferay.portal.kernel.scheduler.Trigger trigger)
522 throws SchedulerException {
523
524 if (!PropsValues.SCHEDULER_ENABLED) {
525 return;
526 }
527
528 try {
529 Scheduler scheduler = getScheduler(trigger.getGroupName());
530
531 trigger = TriggerFactoryUtil.buildTrigger(
532 trigger.getTriggerType(), trigger.getJobName(),
533 getOriginalGroupName(trigger.getGroupName()),
534 trigger.getStartDate(), trigger.getEndDate(),
535 trigger.getTriggerContent());
536
537 update(scheduler, trigger);
538 }
539 catch (Exception e) {
540 throw new SchedulerException("Unable to update trigger", e);
541 }
542 }
543
544 protected String fixMaxLength(String argument, int maxLength) {
545 if (argument == null) {
546 return null;
547 }
548
549 if (argument.length() > maxLength) {
550 argument = argument.substring(0, maxLength);
551 }
552
553 return argument;
554 }
555
556 protected String getFullName(String jobName, String groupName) {
557 return groupName.concat(StringPool.PERIOD).concat(jobName);
558 }
559
560 protected JobState getJobState(JobDataMap jobDataMap) {
561 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
562 JOB_STATE);
563
564 return JobStateSerializeUtil.deserialize(jobStateMap);
565 }
566
567 protected Message getMessage(JobDataMap jobDataMap) {
568 String messageJSON = (String)jobDataMap.get(MESSAGE);
569
570 return (Message)JSONFactoryUtil.deserialize(messageJSON);
571 }
572
573 protected MessageListener getMessageListener(
574 String messageListenerClassName, ClassLoader classLoader)
575 throws SchedulerException {
576
577 MessageListener schedulerEventListener = null;
578
579 try {
580 Class<? extends MessageListener> clazz =
581 (Class<? extends MessageListener>)classLoader.loadClass(
582 messageListenerClassName);
583
584 schedulerEventListener = clazz.newInstance();
585
586 schedulerEventListener =
587 (MessageListener)ProxyUtil.newProxyInstance(
588 classLoader, new Class<?>[] {MessageListener.class},
589 new ClassLoaderBeanHandler(
590 schedulerEventListener, classLoader));
591 }
592 catch (Exception e) {
593 throw new SchedulerException(
594 "Unable to register message listener with name " +
595 messageListenerClassName,
596 e);
597 }
598
599 return schedulerEventListener;
600 }
601
602 protected String getOriginalGroupName(String groupName) {
603 int pos = groupName.indexOf(CharPool.POUND);
604
605 return groupName.substring(pos + 1);
606 }
607
608 protected Trigger getQuartzTrigger(
609 com.liferay.portal.kernel.scheduler.Trigger trigger)
610 throws SchedulerException {
611
612 if (trigger == null) {
613 return null;
614 }
615
616 Date endDate = trigger.getEndDate();
617 String jobName = fixMaxLength(
618 trigger.getJobName(), JOB_NAME_MAX_LENGTH);
619 String groupName = fixMaxLength(
620 trigger.getGroupName(), GROUP_NAME_MAX_LENGTH);
621
622 Date startDate = trigger.getStartDate();
623
624 if (startDate == null) {
625 startDate = new Date(System.currentTimeMillis());
626 }
627
628 Trigger quartzTrigger = null;
629
630 TriggerType triggerType = trigger.getTriggerType();
631
632 if (triggerType.equals(TriggerType.CRON)) {
633 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
634
635 triggerBuilder.endAt(endDate);
636 triggerBuilder.forJob(jobName, groupName);
637 triggerBuilder.startAt(startDate);
638 triggerBuilder.withIdentity(jobName, groupName);
639
640 CronScheduleBuilder cronScheduleBuilder =
641 CronScheduleBuilder.cronSchedule(
642 (String)trigger.getTriggerContent());
643
644 triggerBuilder.withSchedule(cronScheduleBuilder);
645
646 quartzTrigger = triggerBuilder.build();
647 }
648 else if (triggerType.equals(TriggerType.SIMPLE)) {
649 long interval = (Long)trigger.getTriggerContent();
650
651 if (interval <= 0) {
652 if (_log.isDebugEnabled()) {
653 _log.debug(
654 "Not scheduling " + trigger.getJobName() +
655 " because interval is less than or equal to 0");
656 }
657
658 return null;
659 }
660
661 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
662
663 triggerBuilder.endAt(endDate);
664 triggerBuilder.forJob(jobName, groupName);
665 triggerBuilder.startAt(startDate);
666 triggerBuilder.withIdentity(jobName, groupName);
667
668 SimpleScheduleBuilder simpleScheduleBuilder =
669 SimpleScheduleBuilder.simpleSchedule();
670
671 simpleScheduleBuilder.withIntervalInMilliseconds(interval);
672 simpleScheduleBuilder.withRepeatCount(
673 SimpleTrigger.REPEAT_INDEFINITELY);
674
675 triggerBuilder.withSchedule(simpleScheduleBuilder);
676
677 quartzTrigger = triggerBuilder.build();
678 }
679 else {
680 throw new SchedulerException(
681 "Unknown trigger type " + trigger.getTriggerType());
682 }
683
684 return quartzTrigger;
685 }
686
687 protected SchedulerResponse getScheduledJob(
688 Scheduler scheduler, JobKey jobKey)
689 throws Exception {
690
691 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
692
693 if (jobDetail == null) {
694 return null;
695 }
696
697 JobDataMap jobDataMap = jobDetail.getJobDataMap();
698
699 String description = jobDataMap.getString(DESCRIPTION);
700 String destinationName = jobDataMap.getString(DESTINATION_NAME);
701 Message message = getMessage(jobDataMap);
702 StorageType storageType = StorageType.valueOf(
703 jobDataMap.getString(STORAGE_TYPE));
704
705 SchedulerResponse schedulerResponse = null;
706
707 String jobName = jobKey.getName();
708 String groupName = jobKey.getGroup();
709
710 TriggerKey triggerKey = new TriggerKey(jobName, groupName);
711
712 Trigger trigger = scheduler.getTrigger(triggerKey);
713
714 JobState jobState = getJobState(jobDataMap);
715
716 message.put(JOB_STATE, jobState);
717
718 if (trigger == null) {
719 schedulerResponse = new SchedulerResponse();
720
721 schedulerResponse.setDescription(description);
722 schedulerResponse.setDestinationName(destinationName);
723 schedulerResponse.setGroupName(groupName);
724 schedulerResponse.setJobName(jobName);
725 schedulerResponse.setMessage(message);
726 schedulerResponse.setStorageType(storageType);
727 }
728 else {
729 message.put(END_TIME, trigger.getEndTime());
730 message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
731 message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
732 message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
733 message.put(START_TIME, trigger.getStartTime());
734
735 if (CronTrigger.class.isAssignableFrom(trigger.getClass())) {
736
737 CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
738
739 schedulerResponse = new SchedulerResponse();
740
741 schedulerResponse.setDescription(description);
742 schedulerResponse.setDestinationName(destinationName);
743 schedulerResponse.setMessage(message);
744 schedulerResponse.setStorageType(storageType);
745 schedulerResponse.setTrigger(
746 new com.liferay.portal.kernel.scheduler.CronTrigger(
747 jobName, groupName, cronTrigger.getStartTime(),
748 cronTrigger.getEndTime(),
749 cronTrigger.getCronExpression()));
750 }
751 else if (SimpleTrigger.class.isAssignableFrom(trigger.getClass())) {
752 SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
753
754 schedulerResponse = new SchedulerResponse();
755
756 schedulerResponse.setDescription(description);
757 schedulerResponse.setDestinationName(destinationName);
758 schedulerResponse.setMessage(message);
759 schedulerResponse.setStorageType(storageType);
760 schedulerResponse.setTrigger(
761 new IntervalTrigger(
762 jobName, groupName, simpleTrigger.getStartTime(),
763 simpleTrigger.getEndTime(),
764 simpleTrigger.getRepeatInterval()));
765 }
766 }
767
768 return schedulerResponse;
769 }
770
771 protected List<SchedulerResponse> getScheduledJobs(
772 Scheduler scheduler, String groupName)
773 throws Exception {
774
775 groupName = fixMaxLength(
776 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
777
778 List<SchedulerResponse> schedulerResponses =
779 new ArrayList<SchedulerResponse>();
780
781 Set<JobKey> jobKeys = scheduler.getJobKeys(
782 GroupMatcher.jobGroupEquals(groupName));
783
784 for (JobKey jobKey : jobKeys) {
785 SchedulerResponse schedulerResponse = getScheduledJob(
786 scheduler, jobKey);
787
788 if (schedulerResponse != null) {
789 schedulerResponses.add(schedulerResponse);
790 }
791 }
792
793 return schedulerResponses;
794 }
795
796 protected Scheduler getScheduler(String groupName) throws Exception {
797 if (groupName.startsWith(StorageType.PERSISTED.toString())) {
798 return _persistedScheduler;
799 }
800 else {
801 return _memoryScheduler;
802 }
803 }
804
805 protected StorageType getStorageType(String groupName) {
806 int pos = groupName.indexOf(CharPool.POUND);
807
808 String storageTypeString = groupName.substring(0, pos);
809
810 return StorageType.valueOf(storageTypeString);
811 }
812
813 protected Scheduler initializeScheduler(
814 String propertiesPrefix, boolean useQuartzCluster)
815 throws Exception {
816
817 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
818
819 Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
820
821 if (useQuartzCluster) {
822 DB db = DBFactoryUtil.getDB();
823
824 String dbType = db.getType();
825
826 if (dbType.equals(DB.TYPE_SQLSERVER)) {
827 properties.setProperty(
828 "org.quartz.jobStore.lockHandler.class",
829 UpdateLockRowSemaphore.class.getName());
830 }
831
832 if (PropsValues.CLUSTER_LINK_ENABLED) {
833 if (dbType.equals(DB.TYPE_HYPERSONIC)) {
834 _log.error("Unable to cluster scheduler on Hypersonic");
835 }
836 else {
837 properties.put(
838 "org.quartz.jobStore.isClustered",
839 Boolean.TRUE.toString());
840 }
841 }
842 }
843
844 schedulerFactory.initialize(properties);
845
846 return schedulerFactory.getScheduler();
847 }
848
849 protected void initJobState() throws Exception {
850 List<String> groupNames = _persistedScheduler.getJobGroupNames();
851
852 for (String groupName : groupNames) {
853 Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
854 GroupMatcher.jobGroupEquals(groupName));
855
856 for (JobKey jobKey : jobkeys) {
857 Trigger trigger = _persistedScheduler.getTrigger(
858 new TriggerKey(jobKey.getName(), jobKey.getGroup()));
859
860 if (trigger != null) {
861 continue;
862 }
863
864 JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
865
866 JobDataMap jobDataMap = jobDetail.getJobDataMap();
867
868 Message message = getMessage(jobDataMap);
869
870 message.put(JOB_NAME, jobKey.getName());
871 message.put(GROUP_NAME, jobKey.getGroup());
872
873 SchedulerEngineHelperUtil.auditSchedulerJobs(
874 message, TriggerState.EXPIRED);
875
876 _persistedScheduler.deleteJob(jobKey);
877 }
878 }
879 }
880
881 protected void registerMessageListeners(
882 String jobName, String groupName, String destinationName,
883 Message message)
884 throws SchedulerException {
885
886 String messageListenerClassName = message.getString(
887 MESSAGE_LISTENER_CLASS_NAME);
888
889 if (Validator.isNull(messageListenerClassName)) {
890 return;
891 }
892
893 String portletId = message.getString(PORTLET_ID);
894
895 ClassLoader classLoader = null;
896
897 if (Validator.isNull(portletId)) {
898 classLoader = ClassLoaderUtil.getPortalClassLoader();
899 }
900 else {
901 classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
902
903 if (classLoader == null) {
904
905
906
907
908
909 classLoader = ClassLoaderPool.getClassLoader(portletId);
910 }
911 }
912
913 if (classLoader == null) {
914 throw new SchedulerException(
915 "Unable to find class loader for portlet " + portletId);
916 }
917
918 MessageListener schedulerEventListener = getMessageListener(
919 messageListenerClassName, classLoader);
920
921 SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
922 new SchedulerEventMessageListenerWrapper();
923
924 schedulerEventListenerWrapper.setGroupName(groupName);
925 schedulerEventListenerWrapper.setJobName(jobName);
926 schedulerEventListenerWrapper.setMessageListener(
927 schedulerEventListener);
928
929 schedulerEventListenerWrapper.afterPropertiesSet();
930
931 MessageBusUtil.registerMessageListener(
932 destinationName, schedulerEventListenerWrapper);
933
934 message.put(
935 MESSAGE_LISTENER_UUID,
936 schedulerEventListenerWrapper.getMessageListenerUUID());
937
938 message.put(RECEIVER_KEY, getFullName(jobName, groupName));
939 }
940
941 protected void schedule(
942 Scheduler scheduler, StorageType storageType, Trigger trigger,
943 String description, String destinationName, Message message)
944 throws Exception {
945
946 try {
947 JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
948
949 jobBuilder.withIdentity(trigger.getJobKey());
950
951 JobDetail jobDetail = jobBuilder.build();
952
953 JobDataMap jobDataMap = jobDetail.getJobDataMap();
954
955 jobDataMap.put(DESCRIPTION, description);
956 jobDataMap.put(DESTINATION_NAME, destinationName);
957 jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
958 jobDataMap.put(STORAGE_TYPE, storageType.toString());
959
960 JobState jobState = new JobState(
961 TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
962
963 jobDataMap.put(
964 JOB_STATE, JobStateSerializeUtil.serialize(jobState));
965
966 unregisterMessageListener(scheduler, trigger.getJobKey());
967
968 synchronized (this) {
969 scheduler.deleteJob(trigger.getJobKey());
970 scheduler.scheduleJob(jobDetail, trigger);
971 }
972 }
973 catch (ObjectAlreadyExistsException oaee) {
974 if (_log.isInfoEnabled()) {
975 _log.info("Message is already scheduled");
976 }
977 }
978 }
979
980 protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
981 throws Exception {
982
983 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
984
985 if (jobDetail == null) {
986 return;
987 }
988
989 JobDataMap jobDataMap = jobDetail.getJobDataMap();
990
991 if (jobDataMap == null) {
992 return;
993 }
994
995 Message message = getMessage(jobDataMap);
996
997 String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
998
999 if (messageListenerUUID == null) {
1000 return;
1001 }
1002
1003 String destinationName = jobDataMap.getString(DESTINATION_NAME);
1004
1005 MessageBus messageBus = MessageBusUtil.getMessageBus();
1006
1007 Destination destination = messageBus.getDestination(destinationName);
1008
1009 Set<MessageListener> messageListeners =
1010 destination.getMessageListeners();
1011
1012 for (MessageListener messageListener : messageListeners) {
1013 if (!(messageListener instanceof InvokerMessageListener)) {
1014 continue;
1015 }
1016
1017 InvokerMessageListener invokerMessageListener =
1018 (InvokerMessageListener)messageListener;
1019
1020 messageListener = invokerMessageListener.getMessageListener();
1021
1022 if (!(messageListener instanceof
1023 SchedulerEventMessageListenerWrapper)) {
1024
1025 continue;
1026 }
1027
1028 SchedulerEventMessageListenerWrapper schedulerMessageListener =
1029 (SchedulerEventMessageListenerWrapper)messageListener;
1030
1031 if (messageListenerUUID.equals(
1032 schedulerMessageListener.getMessageListenerUUID())) {
1033
1034 messageBus.unregisterMessageListener(
1035 destinationName, schedulerMessageListener);
1036
1037 return;
1038 }
1039 }
1040 }
1041
1042 protected void unschedule(Scheduler scheduler, JobKey jobKey)
1043 throws Exception {
1044
1045 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1046
1047 TriggerKey triggerKey = new TriggerKey(
1048 jobKey.getName(), jobKey.getGroup());
1049
1050 if (jobDetail == null) {
1051 return;
1052 }
1053
1054 unregisterMessageListener(scheduler, jobKey);
1055
1056 if (scheduler == _memoryScheduler) {
1057 scheduler.unscheduleJob(triggerKey);
1058
1059 return;
1060 }
1061
1062 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1063
1064 JobState jobState = getJobState(jobDataMap);
1065
1066 Trigger trigger = scheduler.getTrigger(triggerKey);
1067
1068 jobState.setTriggerDate(END_TIME, new Date());
1069 jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1070 jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1071 jobState.setTriggerDate(
1072 PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1073 jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1074
1075 jobState.setTriggerState(TriggerState.UNSCHEDULED);
1076
1077 jobState.clearExceptions();
1078
1079 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1080
1081 scheduler.unscheduleJob(triggerKey);
1082
1083 scheduler.addJob(jobDetail, true);
1084 }
1085
1086 protected void update(
1087 Scheduler scheduler,
1088 com.liferay.portal.kernel.scheduler.Trigger trigger)
1089 throws Exception {
1090
1091 Trigger quartzTrigger = getQuartzTrigger(trigger);
1092
1093 if (quartzTrigger == null) {
1094 return;
1095 }
1096
1097 TriggerKey triggerKey = quartzTrigger.getKey();
1098
1099 if (scheduler.getTrigger(triggerKey) != null) {
1100 scheduler.rescheduleJob(triggerKey, quartzTrigger);
1101 }
1102 else {
1103 JobKey jobKey = quartzTrigger.getJobKey();
1104
1105 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1106
1107 if (jobDetail == null) {
1108 return;
1109 }
1110
1111 updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1112
1113 synchronized (this) {
1114 scheduler.deleteJob(jobKey);
1115 scheduler.scheduleJob(jobDetail, quartzTrigger);
1116 }
1117 }
1118 }
1119
1120 protected void updateJobState(
1121 Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1122 boolean suppressError)
1123 throws Exception {
1124
1125 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1126
1127 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1128
1129 JobState jobState = getJobState(jobDataMap);
1130
1131 if (triggerState != null) {
1132 jobState.setTriggerState(triggerState);
1133 }
1134
1135 if (suppressError) {
1136 jobState.clearExceptions();
1137 }
1138
1139 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1140
1141 scheduler.addJob(jobDetail, true);
1142 }
1143
1144 @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1145 protected QuartzLocalService quartzLocalService;
1146
1147 private static Log _log = LogFactoryUtil.getLog(
1148 QuartzSchedulerEngine.class);
1149
1150 private Scheduler _memoryScheduler;
1151 private Scheduler _persistedScheduler;
1152
1153 }