001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler.job;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterRequest;
019    import com.liferay.portal.kernel.concurrent.LockRegistry;
020    import com.liferay.portal.kernel.json.JSONFactoryUtil;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.messaging.MessageBusUtil;
025    import com.liferay.portal.kernel.scheduler.JobState;
026    import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
027    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
028    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
029    import com.liferay.portal.kernel.scheduler.StorageType;
030    import com.liferay.portal.kernel.scheduler.TriggerState;
031    import com.liferay.portal.kernel.util.MethodHandler;
032    import com.liferay.portal.kernel.util.MethodKey;
033    import com.liferay.portal.spring.context.PortletContextLoaderListener;
034    import com.liferay.portal.util.PropsValues;
035    
036    import java.util.Map;
037    import java.util.concurrent.locks.ReentrantLock;
038    
039    import org.quartz.Job;
040    import org.quartz.JobDataMap;
041    import org.quartz.JobDetail;
042    import org.quartz.JobExecutionContext;
043    import org.quartz.Scheduler;
044    import org.quartz.Trigger;
045    import org.quartz.TriggerKey;
046    
047    /**
048     * @author Michael C. Han
049     * @author Bruno Farache
050     */
051    public class MessageSenderJob implements Job {
052    
053            public void execute(JobExecutionContext jobExecutionContext) {
054                    try {
055                            doExecute(jobExecutionContext);
056                    }
057                    catch (Exception e) {
058                            _log.error("Unable to execute job", e);
059                    }
060            }
061    
062            protected void doExecute(JobExecutionContext jobExecutionContext)
063                    throws Exception {
064    
065                    JobDetail jobDetail = jobExecutionContext.getJobDetail();
066    
067                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
068    
069                    String destinationName = jobDataMap.getString(
070                            SchedulerEngine.DESTINATION_NAME);
071    
072                    String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
073    
074                    Message message = null;
075    
076                    if (messageJSON == null) {
077                            message = new Message();
078                    }
079                    else {
080                            message = (Message)JSONFactoryUtil.deserialize(messageJSON);
081                    }
082    
083                    String contextPath = message.getString(SchedulerEngine.CONTEXT_PATH);
084    
085                    String lockKey = PortletContextLoaderListener.getLockKey(contextPath);
086    
087                    ReentrantLock executionLock = null;
088    
089                    if (lockKey != null) {
090                            executionLock = LockRegistry.getLock(lockKey, lockKey);
091    
092                            if (executionLock != null) {
093                                    if (executionLock.hasQueuedThreads()) {
094                                            return;
095                                    }
096    
097                                    executionLock.lock();
098                            }
099                    }
100    
101                    try {
102                            message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
103    
104                            Scheduler scheduler = jobExecutionContext.getScheduler();
105    
106                            Map<String, Object> jobStateMap =
107                                    (Map<String, Object>)jobDataMap.get(SchedulerEngine.JOB_STATE);
108    
109                            JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
110    
111                            if (jobExecutionContext.getNextFireTime() == null) {
112                                    Trigger trigger = jobExecutionContext.getTrigger();
113    
114                                    StorageType storageType = StorageType.valueOf(
115                                            jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
116    
117                                    if (storageType.equals(StorageType.PERSISTED)) {
118                                            JobState jobStateClone = updatePersistedJobState(
119                                                    jobState, trigger);
120    
121                                    jobDataMap.put(
122                                            SchedulerEngine.JOB_STATE,
123                                            JobStateSerializeUtil.serialize(jobStateClone));
124    
125                                            scheduler.addJob(jobDetail, true);
126                                    }
127                                    else {
128                                            message.put(SchedulerEngine.DISABLE, true);
129    
130                                            if (PropsValues.CLUSTER_LINK_ENABLED &&
131                                                    storageType.equals(StorageType.MEMORY_CLUSTERED)) {
132    
133                                                    notifyClusterMember(trigger.getKey(), storageType);
134                                            }
135                                    }
136                            }
137    
138                            message.put(SchedulerEngine.JOB_STATE, jobState);
139    
140                            MessageBusUtil.sendMessage(destinationName, message);
141                    }
142                    finally {
143                            if (executionLock != null) {
144                                    executionLock.unlock();
145                            }
146                    }
147            }
148    
149            protected void notifyClusterMember(
150                            TriggerKey triggerKey, StorageType storageType)
151                    throws Exception {
152    
153                    MethodHandler methodHandler = new MethodHandler(
154                            _deleteJobMethodKey, triggerKey.getName(), triggerKey.getGroup(),
155                            storageType);
156    
157                    ClusterRequest clusterRequest =
158                            ClusterRequest.createMulticastRequest(methodHandler, true);
159    
160                    ClusterExecutorUtil.execute(clusterRequest);
161            }
162    
163            protected JobState updatePersistedJobState(
164                    JobState jobState, Trigger trigger) {
165    
166                    jobState.setTriggerDate(SchedulerEngine.END_TIME, trigger.getEndTime());
167                    jobState.setTriggerDate(
168                            SchedulerEngine.FINAL_FIRE_TIME, trigger.getFinalFireTime());
169                    jobState.setTriggerDate(SchedulerEngine.NEXT_FIRE_TIME, null);
170                    jobState.setTriggerDate(
171                            SchedulerEngine.PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
172                    jobState.setTriggerDate(
173                            SchedulerEngine.START_TIME, trigger.getStartTime());
174                    jobState.setTriggerState(TriggerState.COMPLETE);
175    
176                    JobState jobStateClone = (JobState)jobState.clone();
177    
178                    jobStateClone.clearExceptions();
179    
180                    return jobStateClone;
181            }
182    
183            private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
184    
185            private static MethodKey _deleteJobMethodKey = new MethodKey(
186                    SchedulerEngineUtil.class.getName(), "delete", String.class,
187                    String.class, StorageType.class);
188    
189    }