001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler.quartz;
016    
017    import com.liferay.portal.kernel.bean.BeanReference;
018    import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019    import com.liferay.portal.kernel.dao.db.DB;
020    import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021    import com.liferay.portal.kernel.json.JSONFactoryUtil;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.Destination;
025    import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026    import com.liferay.portal.kernel.messaging.Message;
027    import com.liferay.portal.kernel.messaging.MessageBus;
028    import com.liferay.portal.kernel.messaging.MessageBusUtil;
029    import com.liferay.portal.kernel.messaging.MessageListener;
030    import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031    import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032    import com.liferay.portal.kernel.scheduler.JobState;
033    import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036    import com.liferay.portal.kernel.scheduler.SchedulerException;
037    import com.liferay.portal.kernel.scheduler.StorageType;
038    import com.liferay.portal.kernel.scheduler.TriggerState;
039    import com.liferay.portal.kernel.scheduler.TriggerType;
040    import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
041    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
042    import com.liferay.portal.kernel.util.CharPool;
043    import com.liferay.portal.kernel.util.ClassLoaderPool;
044    import com.liferay.portal.kernel.util.ProxyUtil;
045    import com.liferay.portal.kernel.util.Validator;
046    import com.liferay.portal.scheduler.job.MessageSenderJob;
047    import com.liferay.portal.service.QuartzLocalService;
048    import com.liferay.portal.util.ClassLoaderUtil;
049    import com.liferay.portal.util.PropsUtil;
050    import com.liferay.portal.util.PropsValues;
051    
052    import java.util.ArrayList;
053    import java.util.Collections;
054    import java.util.Date;
055    import java.util.List;
056    import java.util.Map;
057    import java.util.Properties;
058    import java.util.Set;
059    
060    import org.quartz.CronScheduleBuilder;
061    import org.quartz.CronTrigger;
062    import org.quartz.JobBuilder;
063    import org.quartz.JobDataMap;
064    import org.quartz.JobDetail;
065    import org.quartz.JobKey;
066    import org.quartz.ObjectAlreadyExistsException;
067    import org.quartz.Scheduler;
068    import org.quartz.SimpleScheduleBuilder;
069    import org.quartz.SimpleTrigger;
070    import org.quartz.Trigger;
071    import org.quartz.TriggerBuilder;
072    import org.quartz.TriggerKey;
073    import org.quartz.impl.StdSchedulerFactory;
074    import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
075    import org.quartz.impl.matchers.GroupMatcher;
076    
077    /**
078     * @author Michael C. Han
079     * @author Bruno Farache
080     * @author Shuyang Zhou
081     * @author Wesley Gong
082     * @author Tina Tian
083     */
084    public class QuartzSchedulerEngine implements SchedulerEngine {
085    
086            public void afterPropertiesSet() {
087                    if (!PropsValues.SCHEDULER_ENABLED) {
088                            return;
089                    }
090    
091                    try {
092                            quartzLocalService.checkQuartzTables();
093    
094                            _persistedScheduler = initializeScheduler(
095                                    "persisted.scheduler.", true);
096    
097                            _memoryScheduler = initializeScheduler("memory.scheduler.", false);
098                    }
099                    catch (Exception e) {
100                            _log.error("Unable to initialize engine", e);
101                    }
102            }
103    
104            @Override
105            public void delete(String groupName, StorageType storageType)
106                    throws SchedulerException {
107    
108                    if (!PropsValues.SCHEDULER_ENABLED) {
109                            return;
110                    }
111    
112                    try {
113                            Scheduler scheduler = getScheduler(storageType);
114    
115                            groupName = fixMaxLength(
116                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
117    
118                            Set<JobKey> jobKeys = scheduler.getJobKeys(
119                                    GroupMatcher.jobGroupEquals(groupName));
120    
121                            for (JobKey jobKey : jobKeys) {
122                                    unregisterMessageListener(scheduler, jobKey);
123    
124                                    scheduler.deleteJob(jobKey);
125                            }
126                    }
127                    catch (Exception e) {
128                            throw new SchedulerException(
129                                    "Unable to delete jobs in group " + groupName, e);
130                    }
131            }
132    
133            @Override
134            public void delete(
135                            String jobName, String groupName, StorageType storageType)
136                    throws SchedulerException {
137    
138                    if (!PropsValues.SCHEDULER_ENABLED) {
139                            return;
140                    }
141    
142                    try {
143                            Scheduler scheduler = getScheduler(storageType);
144    
145                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
146                            groupName = fixMaxLength(
147                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
148    
149                            JobKey jobKey = new JobKey(jobName, groupName);
150    
151                            unregisterMessageListener(scheduler, jobKey);
152    
153                            scheduler.deleteJob(jobKey);
154                    }
155                    catch (Exception e) {
156                            throw new SchedulerException(
157                                    "Unable to delete job {jobName=" + jobName + ", groupName=" +
158                                            groupName + "}",
159                                    e);
160                    }
161            }
162    
163            public void destroy() {
164                    try {
165                            shutdown();
166                    }
167                    catch (SchedulerException se) {
168                            if (_log.isWarnEnabled()) {
169                                    _log.warn("Unable to shutdown", se);
170                            }
171                    }
172            }
173    
174            @Override
175            public SchedulerResponse getScheduledJob(
176                            String jobName, String groupName, StorageType storageType)
177                    throws SchedulerException {
178    
179                    if (!PropsValues.SCHEDULER_ENABLED) {
180                            return null;
181                    }
182    
183                    try {
184                            Scheduler scheduler = getScheduler(storageType);
185    
186                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
187                            groupName = fixMaxLength(
188                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
189    
190                            JobKey jobKey = new JobKey(jobName, groupName);
191    
192                            return getScheduledJob(scheduler, jobKey);
193                    }
194                    catch (Exception e) {
195                            throw new SchedulerException(
196                                    "Unable to get job {jobName=" + jobName + ", groupName=" +
197                                            groupName + "}",
198                                    e);
199                    }
200            }
201    
202            @Override
203            public List<SchedulerResponse> getScheduledJobs()
204                    throws SchedulerException {
205    
206                    if (!PropsValues.SCHEDULER_ENABLED) {
207                            return Collections.emptyList();
208                    }
209    
210                    try {
211                            List<String> groupNames = _persistedScheduler.getJobGroupNames();
212    
213                            List<SchedulerResponse> schedulerResponses =
214                                    new ArrayList<SchedulerResponse>();
215    
216                            for (String groupName : groupNames) {
217                                    schedulerResponses.addAll(
218                                            getScheduledJobs(_persistedScheduler, groupName, null));
219                            }
220    
221                            groupNames = _memoryScheduler.getJobGroupNames();
222    
223                            for (String groupName : groupNames) {
224                                    schedulerResponses.addAll(
225                                            getScheduledJobs(_memoryScheduler, groupName, null));
226                            }
227    
228                            return schedulerResponses;
229                    }
230                    catch (Exception e) {
231                            throw new SchedulerException("Unable to get jobs", e);
232                    }
233            }
234    
235            @Override
236            public List<SchedulerResponse> getScheduledJobs(StorageType storageType)
237                    throws SchedulerException {
238    
239                    if (!PropsValues.SCHEDULER_ENABLED) {
240                            return Collections.emptyList();
241                    }
242    
243                    try {
244                            Scheduler scheduler = getScheduler(storageType);
245    
246                            List<String> groupNames = scheduler.getJobGroupNames();
247    
248                            List<SchedulerResponse> schedulerResponses =
249                                    new ArrayList<SchedulerResponse>();
250    
251                            for (String groupName : groupNames) {
252                                    schedulerResponses.addAll(
253                                            getScheduledJobs(scheduler, groupName, storageType));
254                            }
255    
256                            return schedulerResponses;
257                    }
258                    catch (Exception e) {
259                            throw new SchedulerException(
260                                    "Unable to get jobs with type " + storageType, e);
261                    }
262            }
263    
264            @Override
265            public List<SchedulerResponse> getScheduledJobs(
266                            String groupName, StorageType storageType)
267                    throws SchedulerException {
268    
269                    if (!PropsValues.SCHEDULER_ENABLED) {
270                            return Collections.emptyList();
271                    }
272    
273                    try {
274                            Scheduler scheduler = getScheduler(storageType);
275    
276                            return getScheduledJobs(scheduler, groupName, storageType);
277                    }
278                    catch (Exception e) {
279                            throw new SchedulerException(
280                                    "Unable to get jobs in group " + groupName, e);
281                    }
282            }
283    
284            @Override
285            public void pause(String groupName, StorageType storageType)
286                    throws SchedulerException {
287    
288                    if (!PropsValues.SCHEDULER_ENABLED) {
289                            return;
290                    }
291    
292                    try {
293                            Scheduler scheduler = getScheduler(storageType);
294    
295                            groupName = fixMaxLength(
296                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
297    
298                            Set<JobKey> jobKeys = scheduler.getJobKeys(
299                                    GroupMatcher.jobGroupEquals(groupName));
300    
301                            scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
302    
303                            for (JobKey jobKey : jobKeys) {
304                                    updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
305                            }
306                    }
307                    catch (Exception e) {
308                            throw new SchedulerException(
309                                    "Unable to pause jobs in group " + groupName, e);
310                    }
311            }
312    
313            @Override
314            public void pause(String jobName, String groupName, StorageType storageType)
315                    throws SchedulerException {
316    
317                    if (!PropsValues.SCHEDULER_ENABLED) {
318                            return;
319                    }
320    
321                    try {
322                            Scheduler scheduler = getScheduler(storageType);
323    
324                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
325                            groupName = fixMaxLength(
326                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
327    
328                            JobKey jobKey = new JobKey(jobName, groupName);
329    
330                            scheduler.pauseJob(jobKey);
331    
332                            updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
333                    }
334                    catch (Exception e) {
335                            throw new SchedulerException(
336                                    "Unable to pause job {jobName=" + jobName + ", groupName=" +
337                                            groupName + "}",
338                                    e);
339                    }
340            }
341    
342            @Override
343            public void resume(String groupName, StorageType storageType)
344                    throws SchedulerException {
345    
346                    if (!PropsValues.SCHEDULER_ENABLED) {
347                            return;
348                    }
349    
350                    try {
351                            Scheduler scheduler = getScheduler(storageType);
352    
353                            groupName = fixMaxLength(
354                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
355    
356                            Set<JobKey> jobKeys = scheduler.getJobKeys(
357                                    GroupMatcher.jobGroupEquals(groupName));
358    
359                            scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
360    
361                            for (JobKey jobKey : jobKeys) {
362                                    updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
363                            }
364                    }
365                    catch (Exception e) {
366                            throw new SchedulerException(
367                                    "Unable to resume jobs in group " + groupName, e);
368                    }
369            }
370    
371            @Override
372            public void resume(
373                            String jobName, String groupName, StorageType storageType)
374                    throws SchedulerException {
375    
376                    if (!PropsValues.SCHEDULER_ENABLED) {
377                            return;
378                    }
379    
380                    try {
381                            Scheduler scheduler = getScheduler(storageType);
382    
383                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
384                            groupName = fixMaxLength(
385                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
386    
387                            JobKey jobKey = new JobKey(jobName, groupName);
388    
389                            scheduler.resumeJob(jobKey);
390    
391                            updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
392                    }
393                    catch (Exception e) {
394                            throw new SchedulerException(
395                                    "Unable to resume job {jobName=" + jobName + ", groupName=" +
396                                            groupName + "}",
397                                    e);
398                    }
399            }
400    
401            @Override
402            public void schedule(
403                            com.liferay.portal.kernel.scheduler.Trigger trigger,
404                            String description, String destination, Message message,
405                            StorageType storageType)
406                    throws SchedulerException {
407    
408                    if (!PropsValues.SCHEDULER_ENABLED) {
409                            return;
410                    }
411    
412                    try {
413                            Scheduler scheduler = getScheduler(storageType);
414    
415                            Trigger quartzTrigger = getQuartzTrigger(trigger, storageType);
416    
417                            if (quartzTrigger == null) {
418                                    return;
419                            }
420    
421                            description = fixMaxLength(
422                                    description, DESCRIPTION_MAX_LENGTH, storageType);
423    
424                            if (message == null) {
425                                    message = new Message();
426                            }
427                            else {
428                                    message = message.clone();
429                            }
430    
431                            registerMessageListeners(destination, message);
432    
433                            schedule(
434                                    scheduler, storageType, quartzTrigger, description, destination,
435                                    message);
436                    }
437                    catch (RuntimeException re) {
438    
439                            // ServerDetector will throw an exception when JobSchedulerImpl is
440                            // initialized in a test environment
441    
442                    }
443                    catch (Exception e) {
444                            throw new SchedulerException("Unable to schedule job", e);
445                    }
446            }
447    
448            @Override
449            public void shutdown() throws SchedulerException {
450                    if (!PropsValues.SCHEDULER_ENABLED) {
451                            return;
452                    }
453    
454                    try {
455                            if (!_persistedScheduler.isShutdown()) {
456                                    _persistedScheduler.shutdown(false);
457                            }
458    
459                            if (!_memoryScheduler.isShutdown()) {
460                                    _memoryScheduler.shutdown(false);
461                            }
462                    }
463                    catch (Exception e) {
464                            throw new SchedulerException("Unable to shutdown scheduler", e);
465                    }
466            }
467    
468            @Override
469            public void start() throws SchedulerException {
470                    if (!PropsValues.SCHEDULER_ENABLED) {
471                            return;
472                    }
473    
474                    try {
475                            _persistedScheduler.start();
476    
477                            initJobState();
478    
479                            _memoryScheduler.start();
480                    }
481                    catch (Exception e) {
482                            throw new SchedulerException("Unable to start scheduler", e);
483                    }
484            }
485    
486            @Override
487            public void suppressError(
488                            String jobName, String groupName, StorageType storageType)
489                    throws SchedulerException {
490    
491                    if (!PropsValues.SCHEDULER_ENABLED) {
492                            return;
493                    }
494    
495                    try {
496                            Scheduler scheduler = getScheduler(storageType);
497    
498                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
499                            groupName = fixMaxLength(
500                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
501    
502                            JobKey jobKey = new JobKey(jobName, groupName);
503    
504                            updateJobState(scheduler, jobKey, null, true);
505                    }
506                    catch (Exception e) {
507                            throw new SchedulerException(
508                                    "Unable to suppress error for job {jobName=" + jobName +
509                                            ", groupName=" + groupName + "}",
510                                    e);
511                    }
512            }
513    
514            @Override
515            public void unschedule(String groupName, StorageType storageType)
516                    throws SchedulerException {
517    
518                    if (!PropsValues.SCHEDULER_ENABLED) {
519                            return;
520                    }
521    
522                    try {
523                            Scheduler scheduler = getScheduler(storageType);
524    
525                            groupName = fixMaxLength(
526                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
527    
528                            Set<JobKey> jobKeys = scheduler.getJobKeys(
529                                    GroupMatcher.jobGroupEquals(groupName));
530    
531                            for (JobKey jobKey : jobKeys) {
532                                    unschedule(scheduler, jobKey);
533                            }
534                    }
535                    catch (Exception e) {
536                            throw new SchedulerException(
537                                    "Unable to unschedule jobs in group " + groupName, e);
538                    }
539            }
540    
541            @Override
542            public void unschedule(
543                            String jobName, String groupName, StorageType storageType)
544                    throws SchedulerException {
545    
546                    if (!PropsValues.SCHEDULER_ENABLED) {
547                            return;
548                    }
549    
550                    try {
551                            Scheduler scheduler = getScheduler(storageType);
552    
553                            jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH, storageType);
554                            groupName = fixMaxLength(
555                                    groupName, GROUP_NAME_MAX_LENGTH, storageType);
556    
557                            JobKey jobKey = new JobKey(jobName, groupName);
558    
559                            unschedule(scheduler, jobKey);
560                    }
561                    catch (Exception e) {
562                            throw new SchedulerException(
563                                    "Unable to unschedule job {jobName=" + jobName +
564                                            ", groupName=" + groupName + "}",
565                                    e);
566                    }
567            }
568    
569            @Override
570            public void update(
571                            com.liferay.portal.kernel.scheduler.Trigger trigger,
572                            StorageType storageType)
573                    throws SchedulerException {
574    
575                    if (!PropsValues.SCHEDULER_ENABLED) {
576                            return;
577                    }
578    
579                    try {
580                            Scheduler scheduler = getScheduler(storageType);
581    
582                            update(scheduler, trigger, storageType);
583                    }
584                    catch (Exception e) {
585                            throw new SchedulerException("Unable to update trigger", e);
586                    }
587            }
588    
589            protected String fixMaxLength(
590                    String argument, int maxLength, StorageType storageType) {
591    
592                    if ((argument == null) || (storageType != StorageType.PERSISTED)) {
593                            return argument;
594                    }
595    
596                    if (argument.length() > maxLength) {
597                            argument = argument.substring(0, maxLength);
598                    }
599    
600                    return argument;
601            }
602    
603            protected JobState getJobState(JobDataMap jobDataMap) {
604                    Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
605                            JOB_STATE);
606    
607                    return JobStateSerializeUtil.deserialize(jobStateMap);
608            }
609    
610            protected Message getMessage(JobDataMap jobDataMap) {
611                    String messageJSON = (String)jobDataMap.get(MESSAGE);
612    
613                    return (Message)JSONFactoryUtil.deserialize(messageJSON);
614            }
615    
616            protected MessageListener getMessageListener(
617                            String messageListenerClassName, ClassLoader classLoader)
618                    throws SchedulerException {
619    
620                    MessageListener schedulerEventListener = null;
621    
622                    try {
623                            Class<? extends MessageListener> clazz =
624                                    (Class<? extends MessageListener>)classLoader.loadClass(
625                                            messageListenerClassName);
626    
627                            schedulerEventListener = clazz.newInstance();
628    
629                            schedulerEventListener =
630                                    (MessageListener)ProxyUtil.newProxyInstance(
631                                            classLoader, new Class<?>[] {MessageListener.class},
632                                            new ClassLoaderBeanHandler(
633                                                    schedulerEventListener, classLoader));
634                    }
635                    catch (Exception e) {
636                            throw new SchedulerException(
637                                    "Unable to register message listener with name " +
638                                            messageListenerClassName,
639                                    e);
640                    }
641    
642                    return schedulerEventListener;
643            }
644    
645            protected Trigger getQuartzTrigger(
646                            com.liferay.portal.kernel.scheduler.Trigger trigger,
647                            StorageType storageType)
648                    throws SchedulerException {
649    
650                    if (trigger == null) {
651                            return null;
652                    }
653    
654                    Date endDate = trigger.getEndDate();
655                    String jobName = fixMaxLength(
656                            trigger.getJobName(), JOB_NAME_MAX_LENGTH, storageType);
657                    String groupName = fixMaxLength(
658                            trigger.getGroupName(), GROUP_NAME_MAX_LENGTH, storageType);
659    
660                    Date startDate = trigger.getStartDate();
661    
662                    if (startDate == null) {
663                            startDate = new Date(System.currentTimeMillis());
664                    }
665    
666                    Trigger quartzTrigger = null;
667    
668                    TriggerType triggerType = trigger.getTriggerType();
669    
670                    if (triggerType.equals(TriggerType.CRON)) {
671                            TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
672    
673                            triggerBuilder.endAt(endDate);
674                            triggerBuilder.forJob(jobName, groupName);
675                            triggerBuilder.startAt(startDate);
676                            triggerBuilder.withIdentity(jobName, groupName);
677    
678                            CronScheduleBuilder cronScheduleBuilder =
679                                    CronScheduleBuilder.cronSchedule(
680                                            (String)trigger.getTriggerContent());
681    
682                            triggerBuilder.withSchedule(cronScheduleBuilder);
683    
684                            quartzTrigger = triggerBuilder.build();
685                    }
686                    else if (triggerType.equals(TriggerType.SIMPLE)) {
687                            long interval = (Long)trigger.getTriggerContent();
688    
689                            if (interval <= 0) {
690                                    if (_log.isDebugEnabled()) {
691                                            _log.debug(
692                                                    "Not scheduling " + trigger.getJobName() +
693                                                            " because interval is less than or equal to 0");
694                                    }
695    
696                                    return null;
697                            }
698    
699                            TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
700    
701                            triggerBuilder.endAt(endDate);
702                            triggerBuilder.forJob(jobName, groupName);
703                            triggerBuilder.startAt(startDate);
704                            triggerBuilder.withIdentity(jobName, groupName);
705    
706                            SimpleScheduleBuilder simpleScheduleBuilder =
707                                    SimpleScheduleBuilder.simpleSchedule();
708    
709                            simpleScheduleBuilder.withIntervalInMilliseconds(interval);
710                            simpleScheduleBuilder.withRepeatCount(
711                                    SimpleTrigger.REPEAT_INDEFINITELY);
712    
713                            triggerBuilder.withSchedule(simpleScheduleBuilder);
714    
715                            quartzTrigger = triggerBuilder.build();
716                    }
717                    else {
718                            throw new SchedulerException(
719                                    "Unknown trigger type " + trigger.getTriggerType());
720                    }
721    
722                    return quartzTrigger;
723            }
724    
725            protected SchedulerResponse getScheduledJob(
726                            Scheduler scheduler, JobKey jobKey)
727                    throws Exception {
728    
729                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
730    
731                    if (jobDetail == null) {
732                            return null;
733                    }
734    
735                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
736    
737                    String description = jobDataMap.getString(DESCRIPTION);
738                    String destinationName = jobDataMap.getString(DESTINATION_NAME);
739                    Message message = getMessage(jobDataMap);
740                    StorageType storageType = StorageType.valueOf(
741                            jobDataMap.getString(STORAGE_TYPE));
742    
743                    SchedulerResponse schedulerResponse = null;
744    
745                    String jobName = jobKey.getName();
746                    String groupName = jobKey.getGroup();
747    
748                    TriggerKey triggerKey = new TriggerKey(jobName, groupName);
749    
750                    Trigger trigger = scheduler.getTrigger(triggerKey);
751    
752                    JobState jobState = getJobState(jobDataMap);
753    
754                    message.put(JOB_STATE, jobState);
755    
756                    if (trigger == null) {
757                            schedulerResponse = new SchedulerResponse();
758    
759                            schedulerResponse.setDescription(description);
760                            schedulerResponse.setDestinationName(destinationName);
761                            schedulerResponse.setGroupName(groupName);
762                            schedulerResponse.setJobName(jobName);
763                            schedulerResponse.setMessage(message);
764                            schedulerResponse.setStorageType(storageType);
765                    }
766                    else {
767                            message.put(END_TIME, trigger.getEndTime());
768                            message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
769                            message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
770                            message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
771                            message.put(START_TIME, trigger.getStartTime());
772    
773                            if (trigger instanceof CronTrigger) {
774                                    CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
775    
776                                    schedulerResponse = new SchedulerResponse();
777    
778                                    schedulerResponse.setDescription(description);
779                                    schedulerResponse.setDestinationName(destinationName);
780                                    schedulerResponse.setMessage(message);
781                                    schedulerResponse.setStorageType(storageType);
782                                    schedulerResponse.setTrigger(
783                                            new com.liferay.portal.kernel.scheduler.CronTrigger(
784                                                    jobName, groupName, cronTrigger.getStartTime(),
785                                                    cronTrigger.getEndTime(),
786                                                    cronTrigger.getCronExpression()));
787                            }
788                            else if (trigger instanceof SimpleTrigger) {
789                                    SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
790    
791                                    schedulerResponse = new SchedulerResponse();
792    
793                                    schedulerResponse.setDescription(description);
794                                    schedulerResponse.setDestinationName(destinationName);
795                                    schedulerResponse.setMessage(message);
796                                    schedulerResponse.setStorageType(storageType);
797                                    schedulerResponse.setTrigger(
798                                            new IntervalTrigger(
799                                                    jobName, groupName, simpleTrigger.getStartTime(),
800                                                    simpleTrigger.getEndTime(),
801                                                    simpleTrigger.getRepeatInterval()));
802                            }
803                    }
804    
805                    return schedulerResponse;
806            }
807    
808            protected List<SchedulerResponse> getScheduledJobs(
809                            Scheduler scheduler, String groupName, StorageType storageType)
810                    throws Exception {
811    
812                    groupName = fixMaxLength(groupName, GROUP_NAME_MAX_LENGTH, storageType);
813    
814                    List<SchedulerResponse> schedulerResponses =
815                            new ArrayList<SchedulerResponse>();
816    
817                    Set<JobKey> jobKeys = scheduler.getJobKeys(
818                            GroupMatcher.jobGroupEquals(groupName));
819    
820                    for (JobKey jobKey : jobKeys) {
821                            SchedulerResponse schedulerResponse = getScheduledJob(
822                                    scheduler, jobKey);
823    
824                            if ((schedulerResponse != null) &&
825                                    ((storageType == null) ||
826                                     (storageType == schedulerResponse.getStorageType()))) {
827    
828                                    schedulerResponses.add(schedulerResponse);
829                            }
830                    }
831    
832                    return schedulerResponses;
833            }
834    
835            protected Scheduler getScheduler(StorageType storageType) {
836                    if (storageType == StorageType.PERSISTED) {
837                            return _persistedScheduler;
838                    }
839                    else {
840                            return _memoryScheduler;
841                    }
842            }
843    
844            protected StorageType getStorageType(String groupName) {
845                    int pos = groupName.indexOf(CharPool.POUND);
846    
847                    String storageTypeString = groupName.substring(0, pos);
848    
849                    return StorageType.valueOf(storageTypeString);
850            }
851    
852            protected Scheduler initializeScheduler(
853                            String propertiesPrefix, boolean useQuartzCluster)
854                    throws Exception {
855    
856                    StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
857    
858                    Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
859    
860                    if (useQuartzCluster) {
861                            DB db = DBFactoryUtil.getDB();
862    
863                            String dbType = db.getType();
864    
865                            if (dbType.equals(DB.TYPE_SQLSERVER)) {
866                                    String lockHandlerClassName = properties.getProperty(
867                                            "org.quartz.jobStore.lockHandler.class");
868    
869                                    if (Validator.isNull(lockHandlerClassName)) {
870                                            properties.setProperty(
871                                                    "org.quartz.jobStore.lockHandler.class",
872                                                    UpdateLockRowSemaphore.class.getName());
873                                    }
874                            }
875    
876                            if (PropsValues.CLUSTER_LINK_ENABLED) {
877                                    if (dbType.equals(DB.TYPE_HYPERSONIC)) {
878                                            _log.error("Unable to cluster scheduler on Hypersonic");
879                                    }
880                                    else {
881                                            properties.put(
882                                                    "org.quartz.jobStore.isClustered",
883                                                    Boolean.TRUE.toString());
884                                    }
885                            }
886                    }
887    
888                    schedulerFactory.initialize(properties);
889    
890                    return schedulerFactory.getScheduler();
891            }
892    
893            protected void initJobState() throws Exception {
894                    List<String> groupNames = _persistedScheduler.getJobGroupNames();
895    
896                    for (String groupName : groupNames) {
897                            Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
898                                    GroupMatcher.jobGroupEquals(groupName));
899    
900                            for (JobKey jobKey : jobkeys) {
901                                    Trigger trigger = _persistedScheduler.getTrigger(
902                                            new TriggerKey(jobKey.getName(), jobKey.getGroup()));
903    
904                                    if (trigger != null) {
905                                            continue;
906                                    }
907    
908                                    JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
909    
910                                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
911    
912                                    Message message = getMessage(jobDataMap);
913    
914                                    message.put(JOB_NAME, jobKey.getName());
915                                    message.put(GROUP_NAME, jobKey.getGroup());
916    
917                                    SchedulerEngineHelperUtil.auditSchedulerJobs(
918                                            message, TriggerState.EXPIRED);
919    
920                                    _persistedScheduler.deleteJob(jobKey);
921                            }
922                    }
923            }
924    
925            protected void registerMessageListeners(
926                            String destinationName, Message message)
927                    throws SchedulerException {
928    
929                    String messageListenerClassName = message.getString(
930                            MESSAGE_LISTENER_CLASS_NAME);
931    
932                    if (Validator.isNull(messageListenerClassName)) {
933                            return;
934                    }
935    
936                    String portletId = message.getString(PORTLET_ID);
937    
938                    ClassLoader classLoader = null;
939    
940                    if (Validator.isNull(portletId)) {
941                            classLoader = ClassLoaderUtil.getPortalClassLoader();
942                    }
943                    else {
944                            classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
945    
946                            if (classLoader == null) {
947    
948                                    // No class loader found for the portlet ID, try getting the
949                                    // class loader where we assume the portlet ID is really a
950                                    // servlet context name
951    
952                                    classLoader = ClassLoaderPool.getClassLoader(portletId);
953                            }
954                    }
955    
956                    if (classLoader == null) {
957                            throw new SchedulerException(
958                                    "Unable to find class loader for portlet " + portletId);
959                    }
960    
961                    MessageListener schedulerEventListener = getMessageListener(
962                            messageListenerClassName, classLoader);
963    
964                    SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
965                            new SchedulerEventMessageListenerWrapper();
966    
967                    schedulerEventListenerWrapper.setMessageListener(
968                            schedulerEventListener);
969    
970                    schedulerEventListenerWrapper.afterPropertiesSet();
971    
972                    MessageBusUtil.registerMessageListener(
973                            destinationName, schedulerEventListenerWrapper);
974    
975                    message.put(
976                            MESSAGE_LISTENER_UUID,
977                            schedulerEventListenerWrapper.getMessageListenerUUID());
978            }
979    
980            protected void schedule(
981                            Scheduler scheduler, StorageType storageType, Trigger trigger,
982                            String description, String destinationName, Message message)
983                    throws Exception {
984    
985                    try {
986                            JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
987    
988                            jobBuilder.withIdentity(trigger.getJobKey());
989    
990                            jobBuilder.storeDurably();
991    
992                            JobDetail jobDetail = jobBuilder.build();
993    
994                            JobDataMap jobDataMap = jobDetail.getJobDataMap();
995    
996                            jobDataMap.put(DESCRIPTION, description);
997                            jobDataMap.put(DESTINATION_NAME, destinationName);
998                            jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
999                            jobDataMap.put(STORAGE_TYPE, storageType.toString());
1000    
1001                            JobState jobState = new JobState(
1002                                    TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
1003    
1004                            jobDataMap.put(
1005                                    JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1006    
1007                            unregisterMessageListener(scheduler, trigger.getJobKey());
1008    
1009                            synchronized (this) {
1010                                    scheduler.deleteJob(trigger.getJobKey());
1011                                    scheduler.scheduleJob(jobDetail, trigger);
1012                            }
1013                    }
1014                    catch (ObjectAlreadyExistsException oaee) {
1015                            if (_log.isInfoEnabled()) {
1016                                    _log.info("Message is already scheduled");
1017                            }
1018                    }
1019            }
1020    
1021            protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
1022                    throws Exception {
1023    
1024                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1025    
1026                    if (jobDetail == null) {
1027                            return;
1028                    }
1029    
1030                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
1031    
1032                    if (jobDataMap == null) {
1033                            return;
1034                    }
1035    
1036                    Message message = getMessage(jobDataMap);
1037    
1038                    String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
1039    
1040                    if (messageListenerUUID == null) {
1041                            return;
1042                    }
1043    
1044                    String destinationName = jobDataMap.getString(DESTINATION_NAME);
1045    
1046                    MessageBus messageBus = MessageBusUtil.getMessageBus();
1047    
1048                    Destination destination = messageBus.getDestination(destinationName);
1049    
1050                    if (destination == null) {
1051                            return;
1052                    }
1053    
1054                    Set<MessageListener> messageListeners =
1055                            destination.getMessageListeners();
1056    
1057                    for (MessageListener messageListener : messageListeners) {
1058                            if (!(messageListener instanceof InvokerMessageListener)) {
1059                                    continue;
1060                            }
1061    
1062                            InvokerMessageListener invokerMessageListener =
1063                                    (InvokerMessageListener)messageListener;
1064    
1065                            messageListener = invokerMessageListener.getMessageListener();
1066    
1067                            if (!(messageListener instanceof
1068                                            SchedulerEventMessageListenerWrapper)) {
1069    
1070                                    continue;
1071                            }
1072    
1073                            SchedulerEventMessageListenerWrapper schedulerMessageListener =
1074                                    (SchedulerEventMessageListenerWrapper)messageListener;
1075    
1076                            if (messageListenerUUID.equals(
1077                                            schedulerMessageListener.getMessageListenerUUID())) {
1078    
1079                                    messageBus.unregisterMessageListener(
1080                                            destinationName, schedulerMessageListener);
1081    
1082                                    return;
1083                            }
1084                    }
1085            }
1086    
1087            protected void unschedule(Scheduler scheduler, JobKey jobKey)
1088                    throws Exception {
1089    
1090                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1091    
1092                    TriggerKey triggerKey = new TriggerKey(
1093                            jobKey.getName(), jobKey.getGroup());
1094    
1095                    if (jobDetail == null) {
1096                            return;
1097                    }
1098    
1099                    unregisterMessageListener(scheduler, jobKey);
1100    
1101                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
1102    
1103                    JobState jobState = getJobState(jobDataMap);
1104    
1105                    Trigger trigger = scheduler.getTrigger(triggerKey);
1106    
1107                    jobState.setTriggerDate(END_TIME, new Date());
1108                    jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1109                    jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1110                    jobState.setTriggerDate(
1111                            PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1112                    jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1113    
1114                    jobState.setTriggerState(TriggerState.UNSCHEDULED);
1115    
1116                    jobState.clearExceptions();
1117    
1118                    jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1119    
1120                    scheduler.unscheduleJob(triggerKey);
1121    
1122                    scheduler.addJob(jobDetail, true);
1123            }
1124    
1125            protected void update(
1126                            Scheduler scheduler,
1127                            com.liferay.portal.kernel.scheduler.Trigger trigger,
1128                            StorageType storageType)
1129                    throws Exception {
1130    
1131                    Trigger quartzTrigger = getQuartzTrigger(trigger, storageType);
1132    
1133                    if (quartzTrigger == null) {
1134                            return;
1135                    }
1136    
1137                    TriggerKey triggerKey = quartzTrigger.getKey();
1138    
1139                    if (scheduler.getTrigger(triggerKey) != null) {
1140                            scheduler.rescheduleJob(triggerKey, quartzTrigger);
1141                    }
1142                    else {
1143                            JobKey jobKey = quartzTrigger.getJobKey();
1144    
1145                            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1146    
1147                            if (jobDetail == null) {
1148                                    return;
1149                            }
1150    
1151                            synchronized (this) {
1152                                    scheduler.deleteJob(jobKey);
1153                                    scheduler.scheduleJob(jobDetail, quartzTrigger);
1154                            }
1155    
1156                            updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1157                    }
1158            }
1159    
1160            protected void updateJobState(
1161                            Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1162                            boolean suppressError)
1163                    throws Exception {
1164    
1165                    JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1166    
1167                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
1168    
1169                    JobState jobState = getJobState(jobDataMap);
1170    
1171                    if (triggerState != null) {
1172                            jobState.setTriggerState(triggerState);
1173                    }
1174    
1175                    if (suppressError) {
1176                            jobState.clearExceptions();
1177                    }
1178    
1179                    jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1180    
1181                    scheduler.addJob(jobDetail, true);
1182            }
1183    
1184            @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1185            protected QuartzLocalService quartzLocalService;
1186    
1187            private static final Log _log = LogFactoryUtil.getLog(
1188                    QuartzSchedulerEngine.class);
1189    
1190            private Scheduler _memoryScheduler;
1191            private Scheduler _persistedScheduler;
1192    
1193    }