001
014
015 package com.liferay.portal.scheduler.job;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterRequest;
019 import com.liferay.portal.kernel.concurrent.LockRegistry;
020 import com.liferay.portal.kernel.json.JSONFactoryUtil;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.messaging.MessageBusUtil;
025 import com.liferay.portal.kernel.scheduler.JobState;
026 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
027 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
028 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
029 import com.liferay.portal.kernel.scheduler.StorageType;
030 import com.liferay.portal.kernel.scheduler.TriggerState;
031 import com.liferay.portal.kernel.util.MethodHandler;
032 import com.liferay.portal.kernel.util.MethodKey;
033 import com.liferay.portal.spring.context.PortletContextLoaderListener;
034 import com.liferay.portal.util.PropsValues;
035
036 import java.util.Map;
037 import java.util.concurrent.locks.ReentrantLock;
038
039 import org.quartz.Job;
040 import org.quartz.JobDataMap;
041 import org.quartz.JobDetail;
042 import org.quartz.JobExecutionContext;
043 import org.quartz.Scheduler;
044 import org.quartz.Trigger;
045 import org.quartz.TriggerKey;
046
047
051 public class MessageSenderJob implements Job {
052
053 public void execute(JobExecutionContext jobExecutionContext) {
054 try {
055 doExecute(jobExecutionContext);
056 }
057 catch (Exception e) {
058 _log.error("Unable to execute job", e);
059 }
060 }
061
062 protected void doExecute(JobExecutionContext jobExecutionContext)
063 throws Exception {
064
065 JobDetail jobDetail = jobExecutionContext.getJobDetail();
066
067 JobDataMap jobDataMap = jobDetail.getJobDataMap();
068
069 String destinationName = jobDataMap.getString(
070 SchedulerEngine.DESTINATION_NAME);
071
072 String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
073
074 Message message = null;
075
076 if (messageJSON == null) {
077 message = new Message();
078 }
079 else {
080 message = (Message)JSONFactoryUtil.deserialize(messageJSON);
081 }
082
083 String contextPath = message.getString(SchedulerEngine.CONTEXT_PATH);
084
085 String lockKey = PortletContextLoaderListener.getLockKey(contextPath);
086
087 ReentrantLock executionLock = null;
088
089 if (lockKey != null) {
090 executionLock = LockRegistry.getLock(lockKey, lockKey);
091
092 if (executionLock != null) {
093 if (executionLock.hasQueuedThreads()) {
094 return;
095 }
096
097 executionLock.lock();
098 }
099 }
100
101 try {
102 message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
103
104 Scheduler scheduler = jobExecutionContext.getScheduler();
105
106 Map<String, Object> jobStateMap =
107 (Map<String, Object>)jobDataMap.get(SchedulerEngine.JOB_STATE);
108
109 JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
110
111 if (jobExecutionContext.getNextFireTime() == null) {
112 Trigger trigger = jobExecutionContext.getTrigger();
113
114 StorageType storageType = StorageType.valueOf(
115 jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
116
117 if (storageType.equals(StorageType.PERSISTED)) {
118 JobState jobStateClone = updatePersistedJobState(
119 jobState, trigger);
120
121 jobDataMap.put(
122 SchedulerEngine.JOB_STATE,
123 JobStateSerializeUtil.serialize(jobStateClone));
124
125 scheduler.addJob(jobDetail, true);
126 }
127 else {
128 message.put(SchedulerEngine.DISABLE, true);
129
130 if (PropsValues.CLUSTER_LINK_ENABLED &&
131 storageType.equals(StorageType.MEMORY_CLUSTERED)) {
132
133 notifyClusterMember(trigger.getKey(), storageType);
134 }
135 }
136 }
137
138 message.put(SchedulerEngine.JOB_STATE, jobState);
139
140 MessageBusUtil.sendMessage(destinationName, message);
141 }
142 finally {
143 if (executionLock != null) {
144 executionLock.unlock();
145 }
146 }
147 }
148
149 protected void notifyClusterMember(
150 TriggerKey triggerKey, StorageType storageType)
151 throws Exception {
152
153 MethodHandler methodHandler = new MethodHandler(
154 _deleteJobMethodKey, triggerKey.getName(), triggerKey.getGroup(),
155 storageType);
156
157 ClusterRequest clusterRequest =
158 ClusterRequest.createMulticastRequest(methodHandler, true);
159
160 ClusterExecutorUtil.execute(clusterRequest);
161 }
162
163 protected JobState updatePersistedJobState(
164 JobState jobState, Trigger trigger) {
165
166 jobState.setTriggerDate(SchedulerEngine.END_TIME, trigger.getEndTime());
167 jobState.setTriggerDate(
168 SchedulerEngine.FINAL_FIRE_TIME, trigger.getFinalFireTime());
169 jobState.setTriggerDate(SchedulerEngine.NEXT_FIRE_TIME, null);
170 jobState.setTriggerDate(
171 SchedulerEngine.PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
172 jobState.setTriggerDate(
173 SchedulerEngine.START_TIME, trigger.getStartTime());
174 jobState.setTriggerState(TriggerState.COMPLETE);
175
176 JobState jobStateClone = (JobState)jobState.clone();
177
178 jobStateClone.clearExceptions();
179
180 return jobStateClone;
181 }
182
183 private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
184
185 private static MethodKey _deleteJobMethodKey = new MethodKey(
186 SchedulerEngineUtil.class.getName(), "delete", String.class,
187 String.class, StorageType.class);
188
189 }