001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.scheduler.quartz;
016    
017    import com.liferay.portal.kernel.bean.BeanReference;
018    import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019    import com.liferay.portal.kernel.dao.db.DB;
020    import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021    import com.liferay.portal.kernel.json.JSONFactoryUtil;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.Destination;
025    import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026    import com.liferay.portal.kernel.messaging.Message;
027    import com.liferay.portal.kernel.messaging.MessageBus;
028    import com.liferay.portal.kernel.messaging.MessageBusUtil;
029    import com.liferay.portal.kernel.messaging.MessageListener;
030    import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031    import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032    import com.liferay.portal.kernel.scheduler.JobState;
033    import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036    import com.liferay.portal.kernel.scheduler.SchedulerException;
037    import com.liferay.portal.kernel.scheduler.StorageType;
038    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039    import com.liferay.portal.kernel.scheduler.TriggerState;
040    import com.liferay.portal.kernel.scheduler.messaging.ReceiverKey;
041    import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
042    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043    import com.liferay.portal.kernel.util.CharPool;
044    import com.liferay.portal.kernel.util.ClassLoaderPool;
045    import com.liferay.portal.kernel.util.ProxyUtil;
046    import com.liferay.portal.kernel.util.Validator;
047    import com.liferay.portal.scheduler.job.MessageSenderJob;
048    import com.liferay.portal.service.QuartzLocalService;
049    import com.liferay.portal.util.ClassLoaderUtil;
050    import com.liferay.portal.util.PropsUtil;
051    import com.liferay.portal.util.PropsValues;
052    
053    import java.util.ArrayList;
054    import java.util.Collections;
055    import java.util.Date;
056    import java.util.List;
057    import java.util.Map;
058    import java.util.Properties;
059    import java.util.Set;
060    
061    import org.quartz.CronTrigger;
062    import org.quartz.JobBuilder;
063    import org.quartz.JobDataMap;
064    import org.quartz.JobDetail;
065    import org.quartz.JobKey;
066    import org.quartz.ObjectAlreadyExistsException;
067    import org.quartz.Scheduler;
068    import org.quartz.SimpleTrigger;
069    import org.quartz.Trigger;
070    import org.quartz.TriggerKey;
071    import org.quartz.impl.StdSchedulerFactory;
072    import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
073    import org.quartz.impl.matchers.GroupMatcher;
074    
075    /**
076     * @author Michael C. Han
077     * @author Bruno Farache
078     * @author Shuyang Zhou
079     * @author Wesley Gong
080     * @author Tina Tian
081     */
082    public class QuartzSchedulerEngine implements SchedulerEngine {
083    
084            public void afterPropertiesSet() {
085                    if (!PropsValues.SCHEDULER_ENABLED) {
086                            return;
087                    }
088    
089                    try {
090                            quartzLocalService.checkQuartzTables();
091    
092                            _persistedScheduler = initializeScheduler(
093                                    "persisted.scheduler.", true);
094    
095                            _memoryScheduler = initializeScheduler("memory.scheduler.", false);
096                    }
097                    catch (Exception e) {
098                            _log.error("Unable to initialize engine", e);
099                    }
100            }
101    
102            @Override
103            public void delete(String groupName) throws SchedulerException {
104                    if (!PropsValues.SCHEDULER_ENABLED) {
105                            return;
106                    }
107    
108                    try {
109                            Scheduler scheduler = getScheduler(groupName);
110    
111                            groupName = fixMaxLength(
112                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
113    
114                            Set<JobKey> jobKeys = scheduler.getJobKeys(
115                                    GroupMatcher.jobGroupEquals(groupName));
116    
117                            for (JobKey jobKey : jobKeys) {
118                                    unregisterMessageListener(scheduler, jobKey);
119    
120                                    scheduler.deleteJob(jobKey);
121                            }
122                    }
123                    catch (Exception e) {
124                            throw new SchedulerException(
125                                    "Unable to delete jobs in group " + groupName, e);
126                    }
127            }
128    
129            @Override
130            public void delete(String jobName, String groupName)
131                    throws SchedulerException {
132    
133                    if (!PropsValues.SCHEDULER_ENABLED) {
134                            return;
135                    }
136    
137                    try {
138                            Scheduler scheduler = getScheduler(groupName);
139    
140                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
141                            groupName = fixMaxLength(
142                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
143    
144                            JobKey jobKey = new JobKey(jobName, groupName);
145    
146                            unregisterMessageListener(scheduler, jobKey);
147    
148                            scheduler.deleteJob(jobKey);
149                    }
150                    catch (Exception e) {
151                            throw new SchedulerException(
152                                    "Unable to delete job {jobName=" + jobName + ", groupName=" +
153                                            groupName + "}",
154                                    e);
155                    }
156            }
157    
158            public void destroy() {
159                    try {
160                            shutdown();
161                    }
162                    catch (SchedulerException se) {
163                            if (_log.isWarnEnabled()) {
164                                    _log.warn("Unable to shutdown", se);
165                            }
166                    }
167            }
168    
169            @Override
170            public SchedulerResponse getScheduledJob(String jobName, String groupName)
171                    throws SchedulerException {
172    
173                    if (!PropsValues.SCHEDULER_ENABLED) {
174                            return null;
175                    }
176    
177                    try {
178                            Scheduler scheduler = getScheduler(groupName);
179    
180                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
181                            groupName = fixMaxLength(
182                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
183    
184                            JobKey jobKey = new JobKey(jobName, groupName);
185    
186                            return getScheduledJob(scheduler, jobKey);
187                    }
188                    catch (Exception e) {
189                            throw new SchedulerException(
190                                    "Unable to get job {jobName=" + jobName + ", groupName=" +
191                                            groupName + "}",
192                                    e);
193                    }
194            }
195    
196            @Override
197            public List<SchedulerResponse> getScheduledJobs()
198                    throws SchedulerException {
199    
200                    if (!PropsValues.SCHEDULER_ENABLED) {
201                            return Collections.emptyList();
202                    }
203    
204                    try {
205                            List<String> groupNames = _persistedScheduler.getJobGroupNames();
206    
207                            List<SchedulerResponse> schedulerResponses =
208                                    new ArrayList<SchedulerResponse>();
209    
210                            for (String groupName : groupNames) {
211                                    schedulerResponses.addAll(
212                                            getScheduledJobs(_persistedScheduler, groupName));
213                            }
214    
215                            groupNames = _memoryScheduler.getJobGroupNames();
216    
217                            for (String groupName : groupNames) {
218                                    schedulerResponses.addAll(
219                                            getScheduledJobs(_memoryScheduler, groupName));
220                            }
221    
222                            return schedulerResponses;
223                    }
224                    catch (Exception e) {
225                            throw new SchedulerException("Unable to get jobs", e);
226                    }
227            }
228    
229            @Override
230            public List<SchedulerResponse> getScheduledJobs(String groupName)
231                    throws SchedulerException {
232    
233                    if (!PropsValues.SCHEDULER_ENABLED) {
234                            return Collections.emptyList();
235                    }
236    
237                    try {
238                            Scheduler scheduler = getScheduler(groupName);
239    
240                            return getScheduledJobs(scheduler, groupName);
241                    }
242                    catch (Exception e) {
243                            throw new SchedulerException(
244                                    "Unable to get jobs in group " + groupName, e);
245                    }
246            }
247    
248            @Override
249            public void pause(String groupName) throws SchedulerException {
250                    if (!PropsValues.SCHEDULER_ENABLED) {
251                            return;
252                    }
253    
254                    try {
255                            Scheduler scheduler = getScheduler(groupName);
256    
257                            groupName = fixMaxLength(
258                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
259    
260                            Set<JobKey> jobKeys = scheduler.getJobKeys(
261                                    GroupMatcher.jobGroupEquals(groupName));
262    
263                            scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
264    
265                            for (JobKey jobKey : jobKeys) {
266                                    updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
267                            }
268                    }
269                    catch (Exception e) {
270                            throw new SchedulerException(
271                                    "Unable to pause jobs in group " + groupName, e);
272                    }
273            }
274    
275            @Override
276            public void pause(String jobName, String groupName)
277                    throws SchedulerException {
278    
279                    if (!PropsValues.SCHEDULER_ENABLED) {
280                            return;
281                    }
282    
283                    try {
284                            Scheduler scheduler = getScheduler(groupName);
285    
286                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
287                            groupName = fixMaxLength(
288                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
289    
290                            JobKey jobKey = new JobKey(jobName, groupName);
291    
292                            scheduler.pauseJob(jobKey);
293    
294                            updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
295                    }
296                    catch (Exception e) {
297                            throw new SchedulerException(
298                                    "Unable to pause job {jobName=" + jobName + ", groupName=" +
299                                            groupName + "}",
300                                    e);
301                    }
302            }
303    
304            @Override
305            public void resume(String groupName) throws SchedulerException {
306                    if (!PropsValues.SCHEDULER_ENABLED) {
307                            return;
308                    }
309    
310                    try {
311                            Scheduler scheduler = getScheduler(groupName);
312    
313                            groupName = fixMaxLength(
314                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
315    
316                            Set<JobKey> jobKeys = scheduler.getJobKeys(
317                                    GroupMatcher.jobGroupEquals(groupName));
318    
319                            scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
320    
321                            for (JobKey jobKey : jobKeys) {
322                                    updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
323                            }
324                    }
325                    catch (Exception e) {
326                            throw new SchedulerException(
327                                    "Unable to resume jobs in group " + groupName, e);
328                    }
329            }
330    
331            @Override
332            public void resume(String jobName, String groupName)
333                    throws SchedulerException {
334    
335                    if (!PropsValues.SCHEDULER_ENABLED) {
336                            return;
337                    }
338    
339                    try {
340                            Scheduler scheduler = getScheduler(groupName);
341    
342                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
343                            groupName = fixMaxLength(
344                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
345    
346                            JobKey jobKey = new JobKey(jobName, groupName);
347    
348                            scheduler.resumeJob(jobKey);
349    
350                            updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
351                    }
352                    catch (Exception e) {
353                            throw new SchedulerException(
354                                    "Unable to resume job {jobName=" + jobName + ", groupName=" +
355                                            groupName + "}",
356                                    e);
357                    }
358            }
359    
360            @Override
361            public void schedule(
362                            com.liferay.portal.kernel.scheduler.Trigger trigger,
363                            String description, String destination, Message message)
364                    throws SchedulerException {
365    
366                    if (!PropsValues.SCHEDULER_ENABLED) {
367                            return;
368                    }
369    
370                    try {
371                            Scheduler scheduler = getScheduler(trigger.getGroupName());
372    
373                            StorageType storageType = getStorageType(trigger.getGroupName());
374    
375                            trigger = TriggerFactoryUtil.buildTrigger(
376                                    trigger.getTriggerType(), trigger.getJobName(),
377                                    getOriginalGroupName(trigger.getGroupName()),
378                                    trigger.getStartDate(), trigger.getEndDate(),
379                                    trigger.getTriggerContent());
380    
381                            Trigger quartzTrigger = QuartzTriggerHelperUtil.getQuartzTrigger(
382                                    trigger);
383    
384                            if (quartzTrigger == null) {
385                                    return;
386                            }
387    
388                            description = fixMaxLength(description, DESCRIPTION_MAX_LENGTH);
389    
390                            if (message == null) {
391                                    message = new Message();
392                            }
393                            else {
394                                    message = message.clone();
395                            }
396    
397                            registerMessageListeners(
398                                    trigger.getJobName(), trigger.getGroupName(), destination,
399                                    message);
400    
401                            schedule(
402                                    scheduler, storageType, quartzTrigger, description, destination,
403                                    message);
404                    }
405                    catch (RuntimeException re) {
406    
407                            // ServerDetector will throw an exception when JobSchedulerImpl is
408                            // initialized in a test environment
409    
410                    }
411                    catch (Exception e) {
412                            throw new SchedulerException("Unable to schedule job", e);
413                    }
414            }
415    
416            @Override
417            public void shutdown() throws SchedulerException {
418                    if (!PropsValues.SCHEDULER_ENABLED) {
419                            return;
420                    }
421    
422                    try {
423                            if (!_persistedScheduler.isShutdown()) {
424                                    _persistedScheduler.shutdown(false);
425                            }
426    
427                            if (!_memoryScheduler.isShutdown()) {
428                                    _memoryScheduler.shutdown(false);
429                            }
430                    }
431                    catch (Exception e) {
432                            throw new SchedulerException("Unable to shutdown scheduler", e);
433                    }
434            }
435    
436            @Override
437            public void start() throws SchedulerException {
438                    if (!PropsValues.SCHEDULER_ENABLED) {
439                            return;
440                    }
441    
442                    try {
443                            _persistedScheduler.start();
444    
445                            initJobState();
446    
447                            _memoryScheduler.start();
448                    }
449                    catch (Exception e) {
450                            throw new SchedulerException("Unable to start scheduler", e);
451                    }
452            }
453    
454            @Override
455            public void suppressError(String jobName, String groupName)
456                    throws SchedulerException {
457    
458                    if (!PropsValues.SCHEDULER_ENABLED) {
459                            return;
460                    }
461    
462                    try {
463                            Scheduler scheduler = getScheduler(groupName);
464    
465                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
466                            groupName = fixMaxLength(
467                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
468    
469                            JobKey jobKey = new JobKey(jobName, groupName);
470    
471                            updateJobState(scheduler, jobKey, null, true);
472                    }
473                    catch (Exception e) {
474                            throw new SchedulerException(
475                                    "Unable to suppress error for job {jobName=" + jobName +
476                                            ", groupName=" + groupName + "}",
477                                    e);
478                    }
479            }
480    
481            @Override
482            public void unschedule(String groupName) throws SchedulerException {
483                    if (!PropsValues.SCHEDULER_ENABLED) {
484                            return;
485                    }
486    
487                    try {
488                            Scheduler scheduler = getScheduler(groupName);
489    
490                            groupName = fixMaxLength(
491                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
492    
493                            Set<JobKey> jobKeys = scheduler.getJobKeys(
494                                    GroupMatcher.jobGroupEquals(groupName));
495    
496                            for (JobKey jobKey : jobKeys) {
497                                    unschedule(scheduler, jobKey);
498                            }
499                    }
500                    catch (Exception e) {
501                            throw new SchedulerException(
502                                    "Unable to unschedule jobs in group " + groupName, e);
503                    }
504            }
505    
506            @Override
507            public void unschedule(String jobName, String groupName)
508                    throws SchedulerException {
509    
510                    if (!PropsValues.SCHEDULER_ENABLED) {
511                            return;
512                    }
513    
514                    try {
515                            Scheduler scheduler = getScheduler(groupName);
516    
517                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
518                            groupName = fixMaxLength(
519                                    getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
520    
521                            JobKey jobKey = new JobKey(jobName, groupName);
522    
523                            unschedule(scheduler, jobKey);
524                    }
525                    catch (Exception e) {
526                            throw new SchedulerException(
527                                    "Unable to unschedule job {jobName=" + jobName +
528                                            ", groupName=" + groupName + "}",
529                                    e);
530                    }
531            }
532    
533            @Override
534            public void update(com.liferay.portal.kernel.scheduler.Trigger trigger)
535                    throws SchedulerException {
536    
537                    if (!PropsValues.SCHEDULER_ENABLED) {
538                            return;
539                    }
540    
541                    try {
542                            Scheduler scheduler = getScheduler(trigger.getGroupName());
543    
544                            trigger = TriggerFactoryUtil.buildTrigger(
545                                    trigger.getTriggerType(), trigger.getJobName(),
546                                    getOriginalGroupName(trigger.getGroupName()),
547                                    trigger.getStartDate(), trigger.getEndDate(),
548                                    trigger.getTriggerContent());
549    
550                            update(scheduler, trigger);
551                    }
552                    catch (Exception e) {
553                            throw new SchedulerException("Unable to update trigger", e);
554                    }
555            }
556    
557            protected String fixMaxLength(String argument, int maxLength) {
558                    if (argument == null) {
559                            return null;
560                    }
561    
562                    if (argument.length() > maxLength) {
563                            argument = argument.substring(0, maxLength);
564                    }
565    
566                    return argument;
567            }
568    
569            protected JobState getJobState(JobDataMap jobDataMap) {
570                    Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
571                            JOB_STATE);
572    
573                    return JobStateSerializeUtil.deserialize(jobStateMap);
574            }
575    
576            protected Message getMessage(JobDataMap jobDataMap) {
577                    String messageJSON = (String)jobDataMap.get(MESSAGE);
578    
579                    return (Message)JSONFactoryUtil.deserialize(messageJSON);
580            }
581    
582            protected MessageListener getMessageListener(
583                            String messageListenerClassName, ClassLoader classLoader)
584                    throws SchedulerException {
585    
586                    MessageListener schedulerEventListener = null;
587    
588                    try {
589                            Class<? extends MessageListener> clazz =
590                                    (Class<? extends MessageListener>)classLoader.loadClass(
591                                            messageListenerClassName);
592    
593                            schedulerEventListener = clazz.newInstance();
594    
595                            schedulerEventListener =
596                                    (MessageListener)ProxyUtil.newProxyInstance(
597                                            classLoader, new Class<?>[] {MessageListener.class},
598                                            new ClassLoaderBeanHandler(
599                                                    schedulerEventListener, classLoader));
600                    }
601                    catch (Exception e) {
602                            throw new SchedulerException(
603                                    "Unable to register message listener with name " +
604                                            messageListenerClassName,
605                                    e);
606                    }
607    
608                    return schedulerEventListener;
609            }
610    
611            protected String getOriginalGroupName(String groupName) {
612                    int pos = groupName.indexOf(CharPool.POUND);
613    
614                    return groupName.substring(pos + 1);
615            }
616    
617            protected SchedulerResponse getScheduledJob(
618                            Scheduler scheduler, JobKey jobKey)
619                    throws Exception {
620    
621                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
622    
623                    if (jobDetail == null) {
624                            return null;
625                    }
626    
627                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
628    
629                    String description = jobDataMap.getString(DESCRIPTION);
630                    String destinationName = jobDataMap.getString(DESTINATION_NAME);
631                    Message message = getMessage(jobDataMap);
632                    StorageType storageType = StorageType.valueOf(
633                            jobDataMap.getString(STORAGE_TYPE));
634    
635                    SchedulerResponse schedulerResponse = null;
636    
637                    String jobName = jobKey.getName();
638                    String groupName = jobKey.getGroup();
639    
640                    TriggerKey triggerKey = new TriggerKey(jobName, groupName);
641    
642                    Trigger trigger = scheduler.getTrigger(triggerKey);
643    
644                    JobState jobState = getJobState(jobDataMap);
645    
646                    message.put(JOB_STATE, jobState);
647    
648                    if (trigger == null) {
649                            schedulerResponse = new SchedulerResponse();
650    
651                            schedulerResponse.setDescription(description);
652                            schedulerResponse.setDestinationName(destinationName);
653                            schedulerResponse.setGroupName(groupName);
654                            schedulerResponse.setJobName(jobName);
655                            schedulerResponse.setMessage(message);
656                            schedulerResponse.setStorageType(storageType);
657                    }
658                    else {
659                            message.put(END_TIME, trigger.getEndTime());
660                            message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
661                            message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
662                            message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
663                            message.put(START_TIME, trigger.getStartTime());
664    
665                            if (CronTrigger.class.isAssignableFrom(trigger.getClass())) {
666                                    CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
667    
668                                    schedulerResponse = new SchedulerResponse();
669    
670                                    schedulerResponse.setDescription(description);
671                                    schedulerResponse.setDestinationName(destinationName);
672                                    schedulerResponse.setMessage(message);
673                                    schedulerResponse.setStorageType(storageType);
674                                    schedulerResponse.setTrigger(
675                                            new com.liferay.portal.kernel.scheduler.CronTrigger(
676                                                    jobName, groupName, cronTrigger.getStartTime(),
677                                                    cronTrigger.getEndTime(),
678                                                    cronTrigger.getCronExpression()));
679                            }
680                            else if (SimpleTrigger.class.isAssignableFrom(trigger.getClass())) {
681                                    SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
682    
683                                    schedulerResponse = new SchedulerResponse();
684    
685                                    schedulerResponse.setDescription(description);
686                                    schedulerResponse.setDestinationName(destinationName);
687                                    schedulerResponse.setMessage(message);
688                                    schedulerResponse.setStorageType(storageType);
689                                    schedulerResponse.setTrigger(
690                                            new IntervalTrigger(
691                                                    jobName, groupName, simpleTrigger.getStartTime(),
692                                                    simpleTrigger.getEndTime(),
693                                                    simpleTrigger.getRepeatInterval()));
694                            }
695                    }
696    
697                    return schedulerResponse;
698            }
699    
700            protected List<SchedulerResponse> getScheduledJobs(
701                            Scheduler scheduler, String groupName)
702                    throws Exception {
703    
704                    groupName = fixMaxLength(
705                            getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
706    
707                    List<SchedulerResponse> schedulerResponses =
708                            new ArrayList<SchedulerResponse>();
709    
710                    Set<JobKey> jobKeys = scheduler.getJobKeys(
711                            GroupMatcher.jobGroupEquals(groupName));
712    
713                    for (JobKey jobKey : jobKeys) {
714                            SchedulerResponse schedulerResponse = getScheduledJob(
715                                    scheduler, jobKey);
716    
717                            if (schedulerResponse != null) {
718                                    schedulerResponses.add(schedulerResponse);
719                            }
720                    }
721    
722                    return schedulerResponses;
723            }
724    
725            protected Scheduler getScheduler(String groupName) throws Exception {
726                    if (groupName.startsWith(StorageType.PERSISTED.toString())) {
727                            return _persistedScheduler;
728                    }
729                    else {
730                            return _memoryScheduler;
731                    }
732            }
733    
734            protected StorageType getStorageType(String groupName) {
735                    int pos = groupName.indexOf(CharPool.POUND);
736    
737                    String storageTypeString = groupName.substring(0, pos);
738    
739                    return StorageType.valueOf(storageTypeString);
740            }
741    
742            protected Scheduler initializeScheduler(
743                            String propertiesPrefix, boolean useQuartzCluster)
744                    throws Exception {
745    
746                    StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
747    
748                    Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
749    
750                    if (useQuartzCluster) {
751                            DB db = DBFactoryUtil.getDB();
752    
753                            String dbType = db.getType();
754    
755                            if (dbType.equals(DB.TYPE_SQLSERVER)) {
756                                    String lockHandlerClassName = properties.getProperty(
757                                            "org.quartz.jobStore.lockHandler.class");
758    
759                                    if (Validator.isNull(lockHandlerClassName)) {
760                                            properties.setProperty(
761                                                    "org.quartz.jobStore.lockHandler.class",
762                                                    UpdateLockRowSemaphore.class.getName());
763                                    }
764                            }
765    
766                            if (PropsValues.CLUSTER_LINK_ENABLED) {
767                                    if (dbType.equals(DB.TYPE_HYPERSONIC)) {
768                                            _log.error("Unable to cluster scheduler on Hypersonic");
769                                    }
770                                    else {
771                                            properties.put(
772                                                    "org.quartz.jobStore.isClustered",
773                                                    Boolean.TRUE.toString());
774                                    }
775                            }
776                    }
777    
778                    schedulerFactory.initialize(properties);
779    
780                    return schedulerFactory.getScheduler();
781            }
782    
783            protected void initJobState() throws Exception {
784                    List<String> groupNames = _persistedScheduler.getJobGroupNames();
785    
786                    for (String groupName : groupNames) {
787                            Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
788                                    GroupMatcher.jobGroupEquals(groupName));
789    
790                            for (JobKey jobKey : jobkeys) {
791                                    Trigger trigger = _persistedScheduler.getTrigger(
792                                            new TriggerKey(jobKey.getName(), jobKey.getGroup()));
793    
794                                    if (trigger != null) {
795                                            continue;
796                                    }
797    
798                                    JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
799    
800                                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
801    
802                                    Message message = getMessage(jobDataMap);
803    
804                                    message.put(JOB_NAME, jobKey.getName());
805                                    message.put(GROUP_NAME, jobKey.getGroup());
806    
807                                    SchedulerEngineHelperUtil.auditSchedulerJobs(
808                                            message, TriggerState.EXPIRED);
809    
810                                    _persistedScheduler.deleteJob(jobKey);
811                            }
812                    }
813            }
814    
815            protected void registerMessageListeners(
816                            String jobName, String groupName, String destinationName,
817                            Message message)
818                    throws SchedulerException {
819    
820                    String messageListenerClassName = message.getString(
821                            MESSAGE_LISTENER_CLASS_NAME);
822    
823                    if (Validator.isNull(messageListenerClassName)) {
824                            return;
825                    }
826    
827                    String portletId = message.getString(PORTLET_ID);
828    
829                    ClassLoader classLoader = null;
830    
831                    if (Validator.isNull(portletId)) {
832                            classLoader = ClassLoaderUtil.getPortalClassLoader();
833                    }
834                    else {
835                            classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
836    
837                            if (classLoader == null) {
838    
839                                    // No class loader found for the portlet ID, try getting the
840                                    // class loader where we assume the portlet ID is really a
841                                    // servlet context name
842    
843                                    classLoader = ClassLoaderPool.getClassLoader(portletId);
844                            }
845                    }
846    
847                    if (classLoader == null) {
848                            throw new SchedulerException(
849                                    "Unable to find class loader for portlet " + portletId);
850                    }
851    
852                    MessageListener schedulerEventListener = getMessageListener(
853                            messageListenerClassName, classLoader);
854    
855                    SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
856                            new SchedulerEventMessageListenerWrapper();
857    
858                    schedulerEventListenerWrapper.setGroupName(groupName);
859                    schedulerEventListenerWrapper.setJobName(jobName);
860                    schedulerEventListenerWrapper.setMessageListener(
861                            schedulerEventListener);
862    
863                    schedulerEventListenerWrapper.afterPropertiesSet();
864    
865                    MessageBusUtil.registerMessageListener(
866                            destinationName, schedulerEventListenerWrapper);
867    
868                    message.put(
869                            MESSAGE_LISTENER_UUID,
870                            schedulerEventListenerWrapper.getMessageListenerUUID());
871    
872                    message.put(RECEIVER_KEY, new ReceiverKey(jobName, groupName));
873            }
874    
875            protected void schedule(
876                            Scheduler scheduler, StorageType storageType, Trigger trigger,
877                            String description, String destinationName, Message message)
878                    throws Exception {
879    
880                    try {
881                            JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
882    
883                            jobBuilder.withIdentity(trigger.getJobKey());
884    
885                            jobBuilder.storeDurably();
886    
887                            JobDetail jobDetail = jobBuilder.build();
888    
889                            JobDataMap jobDataMap = jobDetail.getJobDataMap();
890    
891                            jobDataMap.put(DESCRIPTION, description);
892                            jobDataMap.put(DESTINATION_NAME, destinationName);
893                            jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
894                            jobDataMap.put(STORAGE_TYPE, storageType.toString());
895    
896                            JobState jobState = new JobState(
897                                    TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
898    
899                            jobDataMap.put(
900                                    JOB_STATE, JobStateSerializeUtil.serialize(jobState));
901    
902                            unregisterMessageListener(scheduler, trigger.getJobKey());
903    
904                            synchronized (this) {
905                                    scheduler.deleteJob(trigger.getJobKey());
906                                    scheduler.scheduleJob(jobDetail, trigger);
907                            }
908                    }
909                    catch (ObjectAlreadyExistsException oaee) {
910                            if (_log.isInfoEnabled()) {
911                                    _log.info("Message is already scheduled");
912                            }
913                    }
914            }
915    
916            protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
917                    throws Exception {
918    
919                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
920    
921                    if (jobDetail == null) {
922                            return;
923                    }
924    
925                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
926    
927                    if (jobDataMap == null) {
928                            return;
929                    }
930    
931                    Message message = getMessage(jobDataMap);
932    
933                    String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
934    
935                    if (messageListenerUUID == null) {
936                            return;
937                    }
938    
939                    String destinationName = jobDataMap.getString(DESTINATION_NAME);
940    
941                    MessageBus messageBus = MessageBusUtil.getMessageBus();
942    
943                    Destination destination = messageBus.getDestination(destinationName);
944    
945                    if (destination == null) {
946                            return;
947                    }
948    
949                    Set<MessageListener> messageListeners =
950                            destination.getMessageListeners();
951    
952                    for (MessageListener messageListener : messageListeners) {
953                            if (!(messageListener instanceof InvokerMessageListener)) {
954                                    continue;
955                            }
956    
957                            InvokerMessageListener invokerMessageListener =
958                                    (InvokerMessageListener)messageListener;
959    
960                            messageListener = invokerMessageListener.getMessageListener();
961    
962                            if (!(messageListener instanceof
963                                            SchedulerEventMessageListenerWrapper)) {
964    
965                                    continue;
966                            }
967    
968                            SchedulerEventMessageListenerWrapper schedulerMessageListener =
969                                    (SchedulerEventMessageListenerWrapper)messageListener;
970    
971                            if (messageListenerUUID.equals(
972                                            schedulerMessageListener.getMessageListenerUUID())) {
973    
974                                    messageBus.unregisterMessageListener(
975                                            destinationName, schedulerMessageListener);
976    
977                                    return;
978                            }
979                    }
980            }
981    
982            protected void unschedule(Scheduler scheduler, JobKey jobKey)
983                    throws Exception {
984    
985                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
986    
987                    TriggerKey triggerKey = new TriggerKey(
988                            jobKey.getName(), jobKey.getGroup());
989    
990                    if (jobDetail == null) {
991                            return;
992                    }
993    
994                    unregisterMessageListener(scheduler, jobKey);
995    
996                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
997    
998                    JobState jobState = getJobState(jobDataMap);
999    
1000                    Trigger trigger = scheduler.getTrigger(triggerKey);
1001    
1002                    jobState.setTriggerDate(END_TIME, new Date());
1003                    jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1004                    jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1005                    jobState.setTriggerDate(
1006                            PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1007                    jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1008    
1009                    jobState.setTriggerState(TriggerState.UNSCHEDULED);
1010    
1011                    jobState.clearExceptions();
1012    
1013                    jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1014    
1015                    scheduler.unscheduleJob(triggerKey);
1016    
1017                    scheduler.addJob(jobDetail, true);
1018            }
1019    
1020            protected void update(
1021                            Scheduler scheduler,
1022                            com.liferay.portal.kernel.scheduler.Trigger trigger)
1023                    throws Exception {
1024    
1025                    Trigger quartzTrigger = QuartzTriggerHelperUtil.getQuartzTrigger(
1026                            trigger);
1027    
1028                    if (quartzTrigger == null) {
1029                            return;
1030                    }
1031    
1032                    TriggerKey triggerKey = quartzTrigger.getKey();
1033    
1034                    if (scheduler.getTrigger(triggerKey) != null) {
1035                            scheduler.rescheduleJob(triggerKey, quartzTrigger);
1036                    }
1037                    else {
1038                            JobKey jobKey = quartzTrigger.getJobKey();
1039    
1040                            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1041    
1042                            if (jobDetail == null) {
1043                                    return;
1044                            }
1045    
1046                            synchronized (this) {
1047                                    scheduler.deleteJob(jobKey);
1048                                    scheduler.scheduleJob(jobDetail, quartzTrigger);
1049                            }
1050    
1051                            updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1052                    }
1053            }
1054    
1055            protected void updateJobState(
1056                            Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1057                            boolean suppressError)
1058                    throws Exception {
1059    
1060                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1061    
1062                    if (jobDetail == null) {
1063                            return;
1064                    }
1065    
1066                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
1067    
1068                    JobState jobState = getJobState(jobDataMap);
1069    
1070                    if (triggerState != null) {
1071                            jobState.setTriggerState(triggerState);
1072                    }
1073    
1074                    if (suppressError) {
1075                            jobState.clearExceptions();
1076                    }
1077    
1078                    jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1079    
1080                    scheduler.addJob(jobDetail, true);
1081            }
1082    
1083            @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1084            protected QuartzLocalService quartzLocalService;
1085    
1086            private static Log _log = LogFactoryUtil.getLog(
1087                    QuartzSchedulerEngine.class);
1088    
1089            private Scheduler _memoryScheduler;
1090            private Scheduler _persistedScheduler;
1091    
1092    }