001
014
015 package com.liferay.portal.scheduler.quartz;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019 import com.liferay.portal.kernel.dao.db.DB;
020 import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021 import com.liferay.portal.kernel.json.JSONFactoryUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Destination;
025 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026 import com.liferay.portal.kernel.messaging.Message;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageBusUtil;
029 import com.liferay.portal.kernel.messaging.MessageListener;
030 import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031 import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032 import com.liferay.portal.kernel.scheduler.JobState;
033 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036 import com.liferay.portal.kernel.scheduler.SchedulerException;
037 import com.liferay.portal.kernel.scheduler.StorageType;
038 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039 import com.liferay.portal.kernel.scheduler.TriggerState;
040 import com.liferay.portal.kernel.scheduler.messaging.ReceiverKey;
041 import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.CharPool;
044 import com.liferay.portal.kernel.util.ClassLoaderPool;
045 import com.liferay.portal.kernel.util.ProxyUtil;
046 import com.liferay.portal.kernel.util.Validator;
047 import com.liferay.portal.scheduler.job.MessageSenderJob;
048 import com.liferay.portal.service.QuartzLocalService;
049 import com.liferay.portal.util.ClassLoaderUtil;
050 import com.liferay.portal.util.PropsUtil;
051 import com.liferay.portal.util.PropsValues;
052
053 import java.util.ArrayList;
054 import java.util.Collections;
055 import java.util.Date;
056 import java.util.List;
057 import java.util.Map;
058 import java.util.Properties;
059 import java.util.Set;
060
061 import org.quartz.CronTrigger;
062 import org.quartz.JobBuilder;
063 import org.quartz.JobDataMap;
064 import org.quartz.JobDetail;
065 import org.quartz.JobKey;
066 import org.quartz.ObjectAlreadyExistsException;
067 import org.quartz.Scheduler;
068 import org.quartz.SimpleTrigger;
069 import org.quartz.Trigger;
070 import org.quartz.TriggerKey;
071 import org.quartz.impl.StdSchedulerFactory;
072 import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
073 import org.quartz.impl.matchers.GroupMatcher;
074
075
082 public class QuartzSchedulerEngine implements SchedulerEngine {
083
084 public void afterPropertiesSet() {
085 if (!PropsValues.SCHEDULER_ENABLED) {
086 return;
087 }
088
089 try {
090 quartzLocalService.checkQuartzTables();
091
092 _persistedScheduler = initializeScheduler(
093 "persisted.scheduler.", true);
094
095 _memoryScheduler = initializeScheduler("memory.scheduler.", false);
096 }
097 catch (Exception e) {
098 _log.error("Unable to initialize engine", e);
099 }
100 }
101
102 @Override
103 public void delete(String groupName) throws SchedulerException {
104 if (!PropsValues.SCHEDULER_ENABLED) {
105 return;
106 }
107
108 try {
109 Scheduler scheduler = getScheduler(groupName);
110
111 groupName = fixMaxLength(
112 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
113
114 Set<JobKey> jobKeys = scheduler.getJobKeys(
115 GroupMatcher.jobGroupEquals(groupName));
116
117 for (JobKey jobKey : jobKeys) {
118 unregisterMessageListener(scheduler, jobKey);
119
120 scheduler.deleteJob(jobKey);
121 }
122 }
123 catch (Exception e) {
124 throw new SchedulerException(
125 "Unable to delete jobs in group " + groupName, e);
126 }
127 }
128
129 @Override
130 public void delete(String jobName, String groupName)
131 throws SchedulerException {
132
133 if (!PropsValues.SCHEDULER_ENABLED) {
134 return;
135 }
136
137 try {
138 Scheduler scheduler = getScheduler(groupName);
139
140 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
141 groupName = fixMaxLength(
142 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
143
144 JobKey jobKey = new JobKey(jobName, groupName);
145
146 unregisterMessageListener(scheduler, jobKey);
147
148 scheduler.deleteJob(jobKey);
149 }
150 catch (Exception e) {
151 throw new SchedulerException(
152 "Unable to delete job {jobName=" + jobName + ", groupName=" +
153 groupName + "}",
154 e);
155 }
156 }
157
158 public void destroy() {
159 try {
160 shutdown();
161 }
162 catch (SchedulerException se) {
163 if (_log.isWarnEnabled()) {
164 _log.warn("Unable to shutdown", se);
165 }
166 }
167 }
168
169 @Override
170 public SchedulerResponse getScheduledJob(String jobName, String groupName)
171 throws SchedulerException {
172
173 if (!PropsValues.SCHEDULER_ENABLED) {
174 return null;
175 }
176
177 try {
178 Scheduler scheduler = getScheduler(groupName);
179
180 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
181 groupName = fixMaxLength(
182 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
183
184 JobKey jobKey = new JobKey(jobName, groupName);
185
186 return getScheduledJob(scheduler, jobKey);
187 }
188 catch (Exception e) {
189 throw new SchedulerException(
190 "Unable to get job {jobName=" + jobName + ", groupName=" +
191 groupName + "}",
192 e);
193 }
194 }
195
196 @Override
197 public List<SchedulerResponse> getScheduledJobs()
198 throws SchedulerException {
199
200 if (!PropsValues.SCHEDULER_ENABLED) {
201 return Collections.emptyList();
202 }
203
204 try {
205 List<String> groupNames = _persistedScheduler.getJobGroupNames();
206
207 List<SchedulerResponse> schedulerResponses =
208 new ArrayList<SchedulerResponse>();
209
210 for (String groupName : groupNames) {
211 schedulerResponses.addAll(
212 getScheduledJobs(_persistedScheduler, groupName));
213 }
214
215 groupNames = _memoryScheduler.getJobGroupNames();
216
217 for (String groupName : groupNames) {
218 schedulerResponses.addAll(
219 getScheduledJobs(_memoryScheduler, groupName));
220 }
221
222 return schedulerResponses;
223 }
224 catch (Exception e) {
225 throw new SchedulerException("Unable to get jobs", e);
226 }
227 }
228
229 @Override
230 public List<SchedulerResponse> getScheduledJobs(String groupName)
231 throws SchedulerException {
232
233 if (!PropsValues.SCHEDULER_ENABLED) {
234 return Collections.emptyList();
235 }
236
237 try {
238 Scheduler scheduler = getScheduler(groupName);
239
240 return getScheduledJobs(scheduler, groupName);
241 }
242 catch (Exception e) {
243 throw new SchedulerException(
244 "Unable to get jobs in group " + groupName, e);
245 }
246 }
247
248 @Override
249 public void pause(String groupName) throws SchedulerException {
250 if (!PropsValues.SCHEDULER_ENABLED) {
251 return;
252 }
253
254 try {
255 Scheduler scheduler = getScheduler(groupName);
256
257 groupName = fixMaxLength(
258 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
259
260 Set<JobKey> jobKeys = scheduler.getJobKeys(
261 GroupMatcher.jobGroupEquals(groupName));
262
263 scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
264
265 for (JobKey jobKey : jobKeys) {
266 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
267 }
268 }
269 catch (Exception e) {
270 throw new SchedulerException(
271 "Unable to pause jobs in group " + groupName, e);
272 }
273 }
274
275 @Override
276 public void pause(String jobName, String groupName)
277 throws SchedulerException {
278
279 if (!PropsValues.SCHEDULER_ENABLED) {
280 return;
281 }
282
283 try {
284 Scheduler scheduler = getScheduler(groupName);
285
286 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
287 groupName = fixMaxLength(
288 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
289
290 JobKey jobKey = new JobKey(jobName, groupName);
291
292 scheduler.pauseJob(jobKey);
293
294 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
295 }
296 catch (Exception e) {
297 throw new SchedulerException(
298 "Unable to pause job {jobName=" + jobName + ", groupName=" +
299 groupName + "}",
300 e);
301 }
302 }
303
304 @Override
305 public void resume(String groupName) throws SchedulerException {
306 if (!PropsValues.SCHEDULER_ENABLED) {
307 return;
308 }
309
310 try {
311 Scheduler scheduler = getScheduler(groupName);
312
313 groupName = fixMaxLength(
314 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
315
316 Set<JobKey> jobKeys = scheduler.getJobKeys(
317 GroupMatcher.jobGroupEquals(groupName));
318
319 scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
320
321 for (JobKey jobKey : jobKeys) {
322 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
323 }
324 }
325 catch (Exception e) {
326 throw new SchedulerException(
327 "Unable to resume jobs in group " + groupName, e);
328 }
329 }
330
331 @Override
332 public void resume(String jobName, String groupName)
333 throws SchedulerException {
334
335 if (!PropsValues.SCHEDULER_ENABLED) {
336 return;
337 }
338
339 try {
340 Scheduler scheduler = getScheduler(groupName);
341
342 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
343 groupName = fixMaxLength(
344 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
345
346 JobKey jobKey = new JobKey(jobName, groupName);
347
348 scheduler.resumeJob(jobKey);
349
350 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
351 }
352 catch (Exception e) {
353 throw new SchedulerException(
354 "Unable to resume job {jobName=" + jobName + ", groupName=" +
355 groupName + "}",
356 e);
357 }
358 }
359
360 @Override
361 public void schedule(
362 com.liferay.portal.kernel.scheduler.Trigger trigger,
363 String description, String destination, Message message)
364 throws SchedulerException {
365
366 if (!PropsValues.SCHEDULER_ENABLED) {
367 return;
368 }
369
370 try {
371 Scheduler scheduler = getScheduler(trigger.getGroupName());
372
373 StorageType storageType = getStorageType(trigger.getGroupName());
374
375 trigger = TriggerFactoryUtil.buildTrigger(
376 trigger.getTriggerType(), trigger.getJobName(),
377 getOriginalGroupName(trigger.getGroupName()),
378 trigger.getStartDate(), trigger.getEndDate(),
379 trigger.getTriggerContent());
380
381 Trigger quartzTrigger = QuartzTriggerHelperUtil.getQuartzTrigger(
382 trigger);
383
384 if (quartzTrigger == null) {
385 return;
386 }
387
388 description = fixMaxLength(description, DESCRIPTION_MAX_LENGTH);
389
390 if (message == null) {
391 message = new Message();
392 }
393 else {
394 message = message.clone();
395 }
396
397 registerMessageListeners(
398 trigger.getJobName(), trigger.getGroupName(), destination,
399 message);
400
401 schedule(
402 scheduler, storageType, quartzTrigger, description, destination,
403 message);
404 }
405 catch (RuntimeException re) {
406
407
408
409
410 }
411 catch (Exception e) {
412 throw new SchedulerException("Unable to schedule job", e);
413 }
414 }
415
416 @Override
417 public void shutdown() throws SchedulerException {
418 if (!PropsValues.SCHEDULER_ENABLED) {
419 return;
420 }
421
422 try {
423 if (!_persistedScheduler.isShutdown()) {
424 _persistedScheduler.shutdown(false);
425 }
426
427 if (!_memoryScheduler.isShutdown()) {
428 _memoryScheduler.shutdown(false);
429 }
430 }
431 catch (Exception e) {
432 throw new SchedulerException("Unable to shutdown scheduler", e);
433 }
434 }
435
436 @Override
437 public void start() throws SchedulerException {
438 if (!PropsValues.SCHEDULER_ENABLED) {
439 return;
440 }
441
442 try {
443 _persistedScheduler.start();
444
445 initJobState();
446
447 _memoryScheduler.start();
448 }
449 catch (Exception e) {
450 throw new SchedulerException("Unable to start scheduler", e);
451 }
452 }
453
454 @Override
455 public void suppressError(String jobName, String groupName)
456 throws SchedulerException {
457
458 if (!PropsValues.SCHEDULER_ENABLED) {
459 return;
460 }
461
462 try {
463 Scheduler scheduler = getScheduler(groupName);
464
465 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
466 groupName = fixMaxLength(
467 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
468
469 JobKey jobKey = new JobKey(jobName, groupName);
470
471 updateJobState(scheduler, jobKey, null, true);
472 }
473 catch (Exception e) {
474 throw new SchedulerException(
475 "Unable to suppress error for job {jobName=" + jobName +
476 ", groupName=" + groupName + "}",
477 e);
478 }
479 }
480
481 @Override
482 public void unschedule(String groupName) throws SchedulerException {
483 if (!PropsValues.SCHEDULER_ENABLED) {
484 return;
485 }
486
487 try {
488 Scheduler scheduler = getScheduler(groupName);
489
490 groupName = fixMaxLength(
491 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
492
493 Set<JobKey> jobKeys = scheduler.getJobKeys(
494 GroupMatcher.jobGroupEquals(groupName));
495
496 for (JobKey jobKey : jobKeys) {
497 unschedule(scheduler, jobKey);
498 }
499 }
500 catch (Exception e) {
501 throw new SchedulerException(
502 "Unable to unschedule jobs in group " + groupName, e);
503 }
504 }
505
506 @Override
507 public void unschedule(String jobName, String groupName)
508 throws SchedulerException {
509
510 if (!PropsValues.SCHEDULER_ENABLED) {
511 return;
512 }
513
514 try {
515 Scheduler scheduler = getScheduler(groupName);
516
517 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
518 groupName = fixMaxLength(
519 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
520
521 JobKey jobKey = new JobKey(jobName, groupName);
522
523 unschedule(scheduler, jobKey);
524 }
525 catch (Exception e) {
526 throw new SchedulerException(
527 "Unable to unschedule job {jobName=" + jobName +
528 ", groupName=" + groupName + "}",
529 e);
530 }
531 }
532
533 @Override
534 public void update(com.liferay.portal.kernel.scheduler.Trigger trigger)
535 throws SchedulerException {
536
537 if (!PropsValues.SCHEDULER_ENABLED) {
538 return;
539 }
540
541 try {
542 Scheduler scheduler = getScheduler(trigger.getGroupName());
543
544 trigger = TriggerFactoryUtil.buildTrigger(
545 trigger.getTriggerType(), trigger.getJobName(),
546 getOriginalGroupName(trigger.getGroupName()),
547 trigger.getStartDate(), trigger.getEndDate(),
548 trigger.getTriggerContent());
549
550 update(scheduler, trigger);
551 }
552 catch (Exception e) {
553 throw new SchedulerException("Unable to update trigger", e);
554 }
555 }
556
557 protected String fixMaxLength(String argument, int maxLength) {
558 if (argument == null) {
559 return null;
560 }
561
562 if (argument.length() > maxLength) {
563 argument = argument.substring(0, maxLength);
564 }
565
566 return argument;
567 }
568
569 protected JobState getJobState(JobDataMap jobDataMap) {
570 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
571 JOB_STATE);
572
573 return JobStateSerializeUtil.deserialize(jobStateMap);
574 }
575
576 protected Message getMessage(JobDataMap jobDataMap) {
577 String messageJSON = (String)jobDataMap.get(MESSAGE);
578
579 return (Message)JSONFactoryUtil.deserialize(messageJSON);
580 }
581
582 protected MessageListener getMessageListener(
583 String messageListenerClassName, ClassLoader classLoader)
584 throws SchedulerException {
585
586 MessageListener schedulerEventListener = null;
587
588 try {
589 Class<? extends MessageListener> clazz =
590 (Class<? extends MessageListener>)classLoader.loadClass(
591 messageListenerClassName);
592
593 schedulerEventListener = clazz.newInstance();
594
595 schedulerEventListener =
596 (MessageListener)ProxyUtil.newProxyInstance(
597 classLoader, new Class<?>[] {MessageListener.class},
598 new ClassLoaderBeanHandler(
599 schedulerEventListener, classLoader));
600 }
601 catch (Exception e) {
602 throw new SchedulerException(
603 "Unable to register message listener with name " +
604 messageListenerClassName,
605 e);
606 }
607
608 return schedulerEventListener;
609 }
610
611 protected String getOriginalGroupName(String groupName) {
612 int pos = groupName.indexOf(CharPool.POUND);
613
614 return groupName.substring(pos + 1);
615 }
616
617 protected SchedulerResponse getScheduledJob(
618 Scheduler scheduler, JobKey jobKey)
619 throws Exception {
620
621 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
622
623 if (jobDetail == null) {
624 return null;
625 }
626
627 JobDataMap jobDataMap = jobDetail.getJobDataMap();
628
629 String description = jobDataMap.getString(DESCRIPTION);
630 String destinationName = jobDataMap.getString(DESTINATION_NAME);
631 Message message = getMessage(jobDataMap);
632 StorageType storageType = StorageType.valueOf(
633 jobDataMap.getString(STORAGE_TYPE));
634
635 SchedulerResponse schedulerResponse = null;
636
637 String jobName = jobKey.getName();
638 String groupName = jobKey.getGroup();
639
640 TriggerKey triggerKey = new TriggerKey(jobName, groupName);
641
642 Trigger trigger = scheduler.getTrigger(triggerKey);
643
644 JobState jobState = getJobState(jobDataMap);
645
646 message.put(JOB_STATE, jobState);
647
648 if (trigger == null) {
649 schedulerResponse = new SchedulerResponse();
650
651 schedulerResponse.setDescription(description);
652 schedulerResponse.setDestinationName(destinationName);
653 schedulerResponse.setGroupName(groupName);
654 schedulerResponse.setJobName(jobName);
655 schedulerResponse.setMessage(message);
656 schedulerResponse.setStorageType(storageType);
657 }
658 else {
659 message.put(END_TIME, trigger.getEndTime());
660 message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
661 message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
662 message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
663 message.put(START_TIME, trigger.getStartTime());
664
665 if (CronTrigger.class.isAssignableFrom(trigger.getClass())) {
666 CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
667
668 schedulerResponse = new SchedulerResponse();
669
670 schedulerResponse.setDescription(description);
671 schedulerResponse.setDestinationName(destinationName);
672 schedulerResponse.setMessage(message);
673 schedulerResponse.setStorageType(storageType);
674 schedulerResponse.setTrigger(
675 new com.liferay.portal.kernel.scheduler.CronTrigger(
676 jobName, groupName, cronTrigger.getStartTime(),
677 cronTrigger.getEndTime(),
678 cronTrigger.getCronExpression()));
679 }
680 else if (SimpleTrigger.class.isAssignableFrom(trigger.getClass())) {
681 SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
682
683 schedulerResponse = new SchedulerResponse();
684
685 schedulerResponse.setDescription(description);
686 schedulerResponse.setDestinationName(destinationName);
687 schedulerResponse.setMessage(message);
688 schedulerResponse.setStorageType(storageType);
689 schedulerResponse.setTrigger(
690 new IntervalTrigger(
691 jobName, groupName, simpleTrigger.getStartTime(),
692 simpleTrigger.getEndTime(),
693 simpleTrigger.getRepeatInterval()));
694 }
695 }
696
697 return schedulerResponse;
698 }
699
700 protected List<SchedulerResponse> getScheduledJobs(
701 Scheduler scheduler, String groupName)
702 throws Exception {
703
704 groupName = fixMaxLength(
705 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
706
707 List<SchedulerResponse> schedulerResponses =
708 new ArrayList<SchedulerResponse>();
709
710 Set<JobKey> jobKeys = scheduler.getJobKeys(
711 GroupMatcher.jobGroupEquals(groupName));
712
713 for (JobKey jobKey : jobKeys) {
714 SchedulerResponse schedulerResponse = getScheduledJob(
715 scheduler, jobKey);
716
717 if (schedulerResponse != null) {
718 schedulerResponses.add(schedulerResponse);
719 }
720 }
721
722 return schedulerResponses;
723 }
724
725 protected Scheduler getScheduler(String groupName) throws Exception {
726 if (groupName.startsWith(StorageType.PERSISTED.toString())) {
727 return _persistedScheduler;
728 }
729 else {
730 return _memoryScheduler;
731 }
732 }
733
734 protected StorageType getStorageType(String groupName) {
735 int pos = groupName.indexOf(CharPool.POUND);
736
737 String storageTypeString = groupName.substring(0, pos);
738
739 return StorageType.valueOf(storageTypeString);
740 }
741
742 protected Scheduler initializeScheduler(
743 String propertiesPrefix, boolean useQuartzCluster)
744 throws Exception {
745
746 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
747
748 Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
749
750 if (useQuartzCluster) {
751 DB db = DBFactoryUtil.getDB();
752
753 String dbType = db.getType();
754
755 if (dbType.equals(DB.TYPE_SQLSERVER)) {
756 String lockHandlerClassName = properties.getProperty(
757 "org.quartz.jobStore.lockHandler.class");
758
759 if (Validator.isNull(lockHandlerClassName)) {
760 properties.setProperty(
761 "org.quartz.jobStore.lockHandler.class",
762 UpdateLockRowSemaphore.class.getName());
763 }
764 }
765
766 if (PropsValues.CLUSTER_LINK_ENABLED) {
767 if (dbType.equals(DB.TYPE_HYPERSONIC)) {
768 _log.error("Unable to cluster scheduler on Hypersonic");
769 }
770 else {
771 properties.put(
772 "org.quartz.jobStore.isClustered",
773 Boolean.TRUE.toString());
774 }
775 }
776 }
777
778 schedulerFactory.initialize(properties);
779
780 return schedulerFactory.getScheduler();
781 }
782
783 protected void initJobState() throws Exception {
784 List<String> groupNames = _persistedScheduler.getJobGroupNames();
785
786 for (String groupName : groupNames) {
787 Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
788 GroupMatcher.jobGroupEquals(groupName));
789
790 for (JobKey jobKey : jobkeys) {
791 Trigger trigger = _persistedScheduler.getTrigger(
792 new TriggerKey(jobKey.getName(), jobKey.getGroup()));
793
794 if (trigger != null) {
795 continue;
796 }
797
798 JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
799
800 JobDataMap jobDataMap = jobDetail.getJobDataMap();
801
802 Message message = getMessage(jobDataMap);
803
804 message.put(JOB_NAME, jobKey.getName());
805 message.put(GROUP_NAME, jobKey.getGroup());
806
807 SchedulerEngineHelperUtil.auditSchedulerJobs(
808 message, TriggerState.EXPIRED);
809
810 _persistedScheduler.deleteJob(jobKey);
811 }
812 }
813 }
814
815 protected void registerMessageListeners(
816 String jobName, String groupName, String destinationName,
817 Message message)
818 throws SchedulerException {
819
820 String messageListenerClassName = message.getString(
821 MESSAGE_LISTENER_CLASS_NAME);
822
823 if (Validator.isNull(messageListenerClassName)) {
824 return;
825 }
826
827 String portletId = message.getString(PORTLET_ID);
828
829 ClassLoader classLoader = null;
830
831 if (Validator.isNull(portletId)) {
832 classLoader = ClassLoaderUtil.getPortalClassLoader();
833 }
834 else {
835 classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
836
837 if (classLoader == null) {
838
839
840
841
842
843 classLoader = ClassLoaderPool.getClassLoader(portletId);
844 }
845 }
846
847 if (classLoader == null) {
848 throw new SchedulerException(
849 "Unable to find class loader for portlet " + portletId);
850 }
851
852 MessageListener schedulerEventListener = getMessageListener(
853 messageListenerClassName, classLoader);
854
855 SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
856 new SchedulerEventMessageListenerWrapper();
857
858 schedulerEventListenerWrapper.setGroupName(groupName);
859 schedulerEventListenerWrapper.setJobName(jobName);
860 schedulerEventListenerWrapper.setMessageListener(
861 schedulerEventListener);
862
863 schedulerEventListenerWrapper.afterPropertiesSet();
864
865 MessageBusUtil.registerMessageListener(
866 destinationName, schedulerEventListenerWrapper);
867
868 message.put(
869 MESSAGE_LISTENER_UUID,
870 schedulerEventListenerWrapper.getMessageListenerUUID());
871
872 message.put(RECEIVER_KEY, new ReceiverKey(jobName, groupName));
873 }
874
875 protected void schedule(
876 Scheduler scheduler, StorageType storageType, Trigger trigger,
877 String description, String destinationName, Message message)
878 throws Exception {
879
880 try {
881 JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
882
883 jobBuilder.withIdentity(trigger.getJobKey());
884
885 jobBuilder.storeDurably();
886
887 JobDetail jobDetail = jobBuilder.build();
888
889 JobDataMap jobDataMap = jobDetail.getJobDataMap();
890
891 jobDataMap.put(DESCRIPTION, description);
892 jobDataMap.put(DESTINATION_NAME, destinationName);
893 jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
894 jobDataMap.put(STORAGE_TYPE, storageType.toString());
895
896 JobState jobState = new JobState(
897 TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
898
899 jobDataMap.put(
900 JOB_STATE, JobStateSerializeUtil.serialize(jobState));
901
902 unregisterMessageListener(scheduler, trigger.getJobKey());
903
904 synchronized (this) {
905 scheduler.deleteJob(trigger.getJobKey());
906 scheduler.scheduleJob(jobDetail, trigger);
907 }
908 }
909 catch (ObjectAlreadyExistsException oaee) {
910 if (_log.isInfoEnabled()) {
911 _log.info("Message is already scheduled");
912 }
913 }
914 }
915
916 protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
917 throws Exception {
918
919 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
920
921 if (jobDetail == null) {
922 return;
923 }
924
925 JobDataMap jobDataMap = jobDetail.getJobDataMap();
926
927 if (jobDataMap == null) {
928 return;
929 }
930
931 Message message = getMessage(jobDataMap);
932
933 String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
934
935 if (messageListenerUUID == null) {
936 return;
937 }
938
939 String destinationName = jobDataMap.getString(DESTINATION_NAME);
940
941 MessageBus messageBus = MessageBusUtil.getMessageBus();
942
943 Destination destination = messageBus.getDestination(destinationName);
944
945 if (destination == null) {
946 return;
947 }
948
949 Set<MessageListener> messageListeners =
950 destination.getMessageListeners();
951
952 for (MessageListener messageListener : messageListeners) {
953 if (!(messageListener instanceof InvokerMessageListener)) {
954 continue;
955 }
956
957 InvokerMessageListener invokerMessageListener =
958 (InvokerMessageListener)messageListener;
959
960 messageListener = invokerMessageListener.getMessageListener();
961
962 if (!(messageListener instanceof
963 SchedulerEventMessageListenerWrapper)) {
964
965 continue;
966 }
967
968 SchedulerEventMessageListenerWrapper schedulerMessageListener =
969 (SchedulerEventMessageListenerWrapper)messageListener;
970
971 if (messageListenerUUID.equals(
972 schedulerMessageListener.getMessageListenerUUID())) {
973
974 messageBus.unregisterMessageListener(
975 destinationName, schedulerMessageListener);
976
977 return;
978 }
979 }
980 }
981
982 protected void unschedule(Scheduler scheduler, JobKey jobKey)
983 throws Exception {
984
985 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
986
987 TriggerKey triggerKey = new TriggerKey(
988 jobKey.getName(), jobKey.getGroup());
989
990 if (jobDetail == null) {
991 return;
992 }
993
994 unregisterMessageListener(scheduler, jobKey);
995
996 JobDataMap jobDataMap = jobDetail.getJobDataMap();
997
998 JobState jobState = getJobState(jobDataMap);
999
1000 Trigger trigger = scheduler.getTrigger(triggerKey);
1001
1002 jobState.setTriggerDate(END_TIME, new Date());
1003 jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1004 jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1005 jobState.setTriggerDate(
1006 PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1007 jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1008
1009 jobState.setTriggerState(TriggerState.UNSCHEDULED);
1010
1011 jobState.clearExceptions();
1012
1013 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1014
1015 scheduler.unscheduleJob(triggerKey);
1016
1017 scheduler.addJob(jobDetail, true);
1018 }
1019
1020 protected void update(
1021 Scheduler scheduler,
1022 com.liferay.portal.kernel.scheduler.Trigger trigger)
1023 throws Exception {
1024
1025 Trigger quartzTrigger = QuartzTriggerHelperUtil.getQuartzTrigger(
1026 trigger);
1027
1028 if (quartzTrigger == null) {
1029 return;
1030 }
1031
1032 TriggerKey triggerKey = quartzTrigger.getKey();
1033
1034 if (scheduler.getTrigger(triggerKey) != null) {
1035 scheduler.rescheduleJob(triggerKey, quartzTrigger);
1036 }
1037 else {
1038 JobKey jobKey = quartzTrigger.getJobKey();
1039
1040 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1041
1042 if (jobDetail == null) {
1043 return;
1044 }
1045
1046 synchronized (this) {
1047 scheduler.deleteJob(jobKey);
1048 scheduler.scheduleJob(jobDetail, quartzTrigger);
1049 }
1050
1051 updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1052 }
1053 }
1054
1055 protected void updateJobState(
1056 Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1057 boolean suppressError)
1058 throws Exception {
1059
1060 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1061
1062 if (jobDetail == null) {
1063 return;
1064 }
1065
1066 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1067
1068 JobState jobState = getJobState(jobDataMap);
1069
1070 if (triggerState != null) {
1071 jobState.setTriggerState(triggerState);
1072 }
1073
1074 if (suppressError) {
1075 jobState.clearExceptions();
1076 }
1077
1078 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1079
1080 scheduler.addJob(jobDetail, true);
1081 }
1082
1083 @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1084 protected QuartzLocalService quartzLocalService;
1085
1086 private static Log _log = LogFactoryUtil.getLog(
1087 QuartzSchedulerEngine.class);
1088
1089 private Scheduler _memoryScheduler;
1090 private Scheduler _persistedScheduler;
1091
1092 }