001
014
015 package com.liferay.portal.scheduler.job;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterRequest;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.Message;
023 import com.liferay.portal.kernel.messaging.MessageBusUtil;
024 import com.liferay.portal.kernel.scheduler.JobState;
025 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
026 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
027 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
028 import com.liferay.portal.kernel.scheduler.StorageType;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.util.Map;
034
035 import org.quartz.Job;
036 import org.quartz.JobDataMap;
037 import org.quartz.JobDetail;
038 import org.quartz.JobExecutionContext;
039 import org.quartz.JobKey;
040 import org.quartz.Scheduler;
041
042
046 public class MessageSenderJob implements Job {
047
048 public void execute(JobExecutionContext jobExecutionContext) {
049 try {
050 doExecute(jobExecutionContext);
051 }
052 catch (Exception e) {
053 _log.error("Unable to execute job", e);
054 }
055 }
056
057 protected void doExecute(JobExecutionContext jobExecutionContext)
058 throws Exception {
059
060 JobDetail jobDetail = jobExecutionContext.getJobDetail();
061
062 JobDataMap jobDataMap = jobDetail.getJobDataMap();
063
064 String destinationName = jobDataMap.getString(
065 SchedulerEngine.DESTINATION_NAME);
066
067 String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
068
069 Message message = null;
070
071 if (messageJSON == null) {
072 message = new Message();
073 }
074 else {
075 message = (Message)JSONFactoryUtil.deserialize(messageJSON);
076 }
077
078 message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
079
080 Map<String, Object> jobStateMap =
081 (Map<String, Object>)jobDataMap.get(SchedulerEngine.JOB_STATE);
082
083 JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
084
085 JobKey jobKey = jobDetail.getKey();
086
087 if (jobExecutionContext.getNextFireTime() == null) {
088 message.put(SchedulerEngine.DISABLE, true);
089
090 StorageType storageType = StorageType.valueOf(
091 jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
092
093 if (PropsValues.CLUSTER_LINK_ENABLED &&
094 storageType.equals(StorageType.MEMORY_CLUSTERED)) {
095
096 notifyClusterMember(jobKey, storageType);
097 }
098
099 if (storageType.equals(StorageType.PERSISTED)) {
100 Scheduler scheduler = jobExecutionContext.getScheduler();
101
102 scheduler.deleteJob(jobKey);
103 }
104 }
105
106 message.put(SchedulerEngine.JOB_NAME, jobKey.getName());
107 message.put(SchedulerEngine.JOB_STATE, jobState);
108 message.put(SchedulerEngine.GROUP_NAME, jobKey.getGroup());
109
110 MessageBusUtil.sendMessage(destinationName, message);
111 }
112
113 protected void notifyClusterMember(JobKey jobKey, StorageType storageType)
114 throws Exception {
115
116 MethodHandler methodHandler = new MethodHandler(
117 _deleteJobMethodKey, jobKey.getName(), jobKey.getGroup(),
118 storageType);
119
120 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
121 methodHandler, true);
122
123 ClusterExecutorUtil.execute(clusterRequest);
124 }
125
126 private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
127
128 private static MethodKey _deleteJobMethodKey = new MethodKey(
129 SchedulerEngineHelperUtil.class.getName(), "delete", String.class,
130 String.class, StorageType.class);
131
132 }