001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018 import com.liferay.portal.kernel.bean.BeanReference;
019 import com.liferay.portal.kernel.bean.IdentifiableBean;
020 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
021 import com.liferay.portal.kernel.cluster.BaseClusterMasterTokenTransitionListener;
022 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
024 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
025 import com.liferay.portal.kernel.cluster.ClusterMasterExecutorUtil;
026 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
027 import com.liferay.portal.kernel.cluster.ClusterRequest;
028 import com.liferay.portal.kernel.cluster.Clusterable;
029 import com.liferay.portal.kernel.log.Log;
030 import com.liferay.portal.kernel.log.LogFactoryUtil;
031 import com.liferay.portal.kernel.messaging.Message;
032 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
033 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
035 import com.liferay.portal.kernel.scheduler.SchedulerException;
036 import com.liferay.portal.kernel.scheduler.StorageType;
037 import com.liferay.portal.kernel.scheduler.Trigger;
038 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039 import com.liferay.portal.kernel.scheduler.TriggerState;
040 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
041 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
042 import com.liferay.portal.kernel.util.CharPool;
043 import com.liferay.portal.kernel.util.MethodHandler;
044 import com.liferay.portal.kernel.util.MethodKey;
045 import com.liferay.portal.kernel.util.ObjectValuePair;
046 import com.liferay.portal.kernel.util.StringPool;
047 import com.liferay.portal.scheduler.quartz.QuartzTriggerHelperUtil;
048 import com.liferay.portal.spring.aop.ServiceBeanAopProxy;
049 import com.liferay.portal.util.PropsValues;
050
051 import java.io.Serializable;
052
053 import java.util.Date;
054 import java.util.Iterator;
055 import java.util.List;
056 import java.util.Map;
057 import java.util.Set;
058 import java.util.concurrent.ConcurrentHashMap;
059 import java.util.concurrent.Future;
060 import java.util.concurrent.TimeUnit;
061 import java.util.concurrent.locks.ReadWriteLock;
062 import java.util.concurrent.locks.ReentrantReadWriteLock;
063
064 import org.springframework.aop.TargetSource;
065 import org.springframework.aop.framework.AdvisedSupport;
066
067
070 public class ClusterSchedulerEngine
071 implements IdentifiableBean, SchedulerEngine {
072
073 public static SchedulerEngine createClusterSchedulerEngine(
074 SchedulerEngine schedulerEngine) {
075
076 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
077 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
078 }
079
080 return schedulerEngine;
081 }
082
083 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
084 _schedulerEngine = schedulerEngine;
085
086 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
087
088 _readLock = readWriteLock.readLock();
089 _writeLock = readWriteLock.writeLock();
090 }
091
092 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
093 @Override
094 public void delete(String groupName) throws SchedulerException {
095 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
096
097 _readLock.lock();
098
099 try {
100 if (memoryClusteredSlaveJob) {
101 removeMemoryClusteredJobs(groupName);
102 }
103 else {
104 _schedulerEngine.delete(groupName);
105 }
106 }
107 finally {
108 _readLock.unlock();
109 }
110
111 setClusterableThreadLocal(groupName);
112 }
113
114 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
115 @Override
116 public void delete(String jobName, String groupName)
117 throws SchedulerException {
118
119 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
120
121 _readLock.lock();
122
123 try {
124 if (memoryClusteredSlaveJob) {
125 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
126 }
127 else {
128 _schedulerEngine.delete(jobName, groupName);
129 }
130 }
131 finally {
132 _readLock.unlock();
133 }
134
135 setClusterableThreadLocal(groupName);
136 }
137
138 @Override
139 public String getBeanIdentifier() {
140 return _beanIdentifier;
141 }
142
143 @Clusterable(onMaster = true)
144 @Override
145 public SchedulerResponse getScheduledJob(String jobName, String groupName)
146 throws SchedulerException {
147
148 _readLock.lock();
149
150 try {
151 return _schedulerEngine.getScheduledJob(jobName, groupName);
152 }
153 finally {
154 _readLock.unlock();
155 }
156 }
157
158 @Clusterable(onMaster = true)
159 @Override
160 public List<SchedulerResponse> getScheduledJobs()
161 throws SchedulerException {
162
163 _readLock.lock();
164
165 try {
166 return _schedulerEngine.getScheduledJobs();
167 }
168 finally {
169 _readLock.unlock();
170 }
171 }
172
173 @Clusterable(onMaster = true)
174 @Override
175 public List<SchedulerResponse> getScheduledJobs(String groupName)
176 throws SchedulerException {
177
178 _readLock.lock();
179
180 try {
181 return _schedulerEngine.getScheduledJobs(groupName);
182 }
183 finally {
184 _readLock.unlock();
185 }
186 }
187
188 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
189 @Override
190 public void pause(String groupName) throws SchedulerException {
191 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
192
193 _readLock.lock();
194
195 try {
196 if (memoryClusteredSlaveJob) {
197 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
198 }
199 else {
200 _schedulerEngine.pause(groupName);
201 }
202 }
203 finally {
204 _readLock.unlock();
205 }
206
207 setClusterableThreadLocal(groupName);
208 }
209
210 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
211 @Override
212 public void pause(String jobName, String groupName)
213 throws SchedulerException {
214
215 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
216
217 _readLock.lock();
218
219 try {
220 if (memoryClusteredSlaveJob) {
221 updateMemoryClusteredJob(
222 jobName, groupName, TriggerState.PAUSED);
223 }
224 else {
225 _schedulerEngine.pause(jobName, groupName);
226 }
227 }
228 finally {
229 _readLock.unlock();
230 }
231
232 setClusterableThreadLocal(groupName);
233 }
234
235 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
236 @Override
237 public void resume(String groupName) throws SchedulerException {
238 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
239
240 _readLock.lock();
241
242 try {
243 if (memoryClusteredSlaveJob) {
244 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
245 }
246 else {
247 _schedulerEngine.resume(groupName);
248 }
249 }
250 finally {
251 _readLock.unlock();
252 }
253
254 setClusterableThreadLocal(groupName);
255 }
256
257 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
258 @Override
259 public void resume(String jobName, String groupName)
260 throws SchedulerException {
261
262 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
263
264 _readLock.lock();
265
266 try {
267 if (memoryClusteredSlaveJob) {
268 updateMemoryClusteredJob(
269 jobName, groupName, TriggerState.NORMAL);
270 }
271 else {
272 _schedulerEngine.resume(jobName, groupName);
273 }
274 }
275 finally {
276 _readLock.unlock();
277 }
278
279 setClusterableThreadLocal(groupName);
280 }
281
282 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
283 @Override
284 public void schedule(
285 Trigger trigger, String description, String destinationName,
286 Message message)
287 throws SchedulerException {
288
289 String groupName = trigger.getGroupName();
290
291 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
292 groupName);
293
294 StorageType storageType = objectValuePair.getValue();
295
296 _readLock.lock();
297
298 try {
299 if (storageType == StorageType.MEMORY_CLUSTERED) {
300 if (ClusterMasterExecutorUtil.isMaster()) {
301 _schedulerEngine.schedule(
302 trigger, description, destinationName, message);
303
304 if (_portalReady) {
305 _notifySlave(
306 trigger, description, destinationName, message);
307 }
308 }
309 }
310 else {
311 _schedulerEngine.schedule(
312 trigger, description, destinationName, message);
313 }
314 }
315 finally {
316 _readLock.unlock();
317 }
318
319 setClusterableThreadLocal(groupName);
320 }
321
322 @Override
323 public void setBeanIdentifier(String beanIdentifier) {
324 _beanIdentifier = beanIdentifier;
325 }
326
327 @Override
328 public void shutdown() throws SchedulerException {
329 _portalReady = false;
330
331 ClusterMasterExecutorUtil.
332 unregisterClusterMasterTokenTransitionListener(
333 _schedulerClusterMasterTokenTransitionListener);
334
335 _schedulerEngine.shutdown();
336 }
337
338 @Override
339 public void start() throws SchedulerException {
340 try {
341 if (!ClusterMasterExecutorUtil.isMaster()) {
342 initMemoryClusteredJobs();
343 }
344
345 _schedulerClusterMasterTokenTransitionListener =
346 new SchedulerClusterMasterTokenTransitionListener();
347
348 ClusterMasterExecutorUtil.
349 registerClusterMasterTokenTransitionListener(
350 _schedulerClusterMasterTokenTransitionListener);
351 }
352 catch (Exception e) {
353 throw new SchedulerException("Unable to initialize scheduler", e);
354 }
355
356 _schedulerEngine.start();
357
358 _portalReady = true;
359 }
360
361 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
362 @Override
363 public void suppressError(String jobName, String groupName)
364 throws SchedulerException {
365
366 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
367
368 if (!memoryClusteredSlaveJob) {
369 _readLock.lock();
370
371 try {
372 _schedulerEngine.suppressError(jobName, groupName);
373 }
374 finally {
375 _readLock.unlock();
376 }
377 }
378
379 setClusterableThreadLocal(groupName);
380 }
381
382 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
383 @Override
384 public void unschedule(String groupName) throws SchedulerException {
385 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
386
387 _readLock.lock();
388
389 try {
390 if (memoryClusteredSlaveJob) {
391 removeMemoryClusteredJobs(groupName);
392 }
393 else {
394 _schedulerEngine.unschedule(groupName);
395 }
396 }
397 finally {
398 _readLock.unlock();
399 }
400
401 setClusterableThreadLocal(groupName);
402 }
403
404 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
405 @Override
406 public void unschedule(String jobName, String groupName)
407 throws SchedulerException {
408
409 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
410
411 _readLock.lock();
412
413 try {
414 if (memoryClusteredSlaveJob) {
415 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
416 }
417 else {
418 _schedulerEngine.unschedule(jobName, groupName);
419 }
420 }
421 finally {
422 _readLock.unlock();
423 }
424
425 setClusterableThreadLocal(groupName);
426 }
427
428 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
429 @Override
430 public void update(Trigger trigger) throws SchedulerException {
431 String jobName = trigger.getJobName();
432 String groupName = trigger.getGroupName();
433
434 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
435
436 _readLock.lock();
437
438 try {
439 if (memoryClusteredSlaveJob) {
440 boolean updated = false;
441
442 for (ObjectValuePair<SchedulerResponse, TriggerState>
443 memoryClusteredJob : _memoryClusteredJobs.values()) {
444
445 SchedulerResponse schedulerResponse =
446 memoryClusteredJob.getKey();
447
448 if (jobName.equals(schedulerResponse.getJobName()) &&
449 groupName.equals(schedulerResponse.getGroupName())) {
450
451 schedulerResponse.setTrigger(trigger);
452
453 updated = true;
454
455 break;
456 }
457 }
458
459 if (!updated) {
460 throw new SchedulerException(
461 "Unable to update trigger for memory clustered job");
462 }
463 }
464 else {
465 _schedulerEngine.update(trigger);
466 }
467 }
468 finally {
469 _readLock.unlock();
470 }
471
472 setClusterableThreadLocal(groupName);
473 }
474
475 protected void addMemoryClusteredJob(SchedulerResponse schedulerResponse)
476 throws SchedulerException {
477
478 Trigger oldTrigger = schedulerResponse.getTrigger();
479
480 String jobName = schedulerResponse.getJobName();
481 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
482 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
483
484 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
485 oldTrigger.getTriggerType(), jobName, groupName,
486 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
487 oldTrigger.getTriggerContent());
488
489 schedulerResponse.setTrigger(newTrigger);
490
491 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
492 schedulerResponse);
493
494 Message message = schedulerResponse.getMessage();
495
496 message.remove(JOB_STATE);
497
498 _memoryClusteredJobs.put(
499 getFullName(jobName, groupName),
500 new ObjectValuePair<SchedulerResponse, TriggerState>(
501 schedulerResponse, triggerState));
502 }
503
504 protected String getFullName(String jobName, String groupName) {
505 return groupName.concat(StringPool.PERIOD).concat(jobName);
506 }
507
508 protected void initMemoryClusteredJobs() throws Exception {
509 MethodHandler methodHandler = new MethodHandler(
510 _getScheduledJobsMethodKey, StorageType.MEMORY_CLUSTERED);
511
512 Future<List<SchedulerResponse>> future =
513 ClusterMasterExecutorUtil.executeOnMaster(methodHandler);
514
515 List<SchedulerResponse> schedulerResponses = future.get(
516 PropsValues.CLUSTERABLE_ADVICE_CALL_MASTER_TIMEOUT,
517 TimeUnit.SECONDS);
518
519 for (SchedulerResponse schedulerResponse : schedulerResponses) {
520 addMemoryClusteredJob(schedulerResponse);
521 }
522 }
523
524 protected boolean isMemoryClusteredSlaveJob(String groupName)
525 throws SchedulerException {
526
527 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
528 groupName);
529
530 StorageType storageType = objectValuePair.getValue();
531
532 if ((storageType != StorageType.MEMORY_CLUSTERED) ||
533 ClusterMasterExecutorUtil.isMaster()) {
534
535 return false;
536 }
537
538 return true;
539 }
540
541 protected void removeMemoryClusteredJobs(String groupName) {
542 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
543 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
544
545 Iterator
546 <Map.Entry<String,
547 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
548 memoryClusteredJobs.iterator();
549
550 while (itr.hasNext()) {
551 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
552 entry = itr.next();
553
554 ObjectValuePair<SchedulerResponse, TriggerState>
555 memoryClusteredJob = entry.getValue();
556
557 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
558
559 if (groupName.equals(schedulerResponse.getGroupName())) {
560 itr.remove();
561 }
562 }
563 }
564
565 protected ObjectValuePair<String, StorageType> resolveGroupName(
566 String groupName) {
567
568 int index = groupName.indexOf(CharPool.POUND);
569
570 String storageTypeString = groupName.substring(0, index);
571
572 StorageType storageType = StorageType.valueOf(storageTypeString);
573
574 String orginalGroupName = groupName.substring(index + 1);
575
576 return new ObjectValuePair<String, StorageType>(
577 orginalGroupName, storageType);
578 }
579
580 protected void setClusterableThreadLocal(String groupName) {
581 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
582 groupName);
583
584 ClusterableContextThreadLocal.putThreadLocalContext(
585 STORAGE_TYPE, objectValuePair.getValue());
586 ClusterableContextThreadLocal.putThreadLocalContext(
587 _PORTAL_READY, _portalReady);
588
589 boolean pluginReady = true;
590
591 if (PluginContextLifecycleThreadLocal.isInitializing() ||
592 PluginContextLifecycleThreadLocal.isDestroying()) {
593
594 pluginReady = false;
595 }
596
597 ClusterableContextThreadLocal.putThreadLocalContext(
598 _PLUGIN_READY, pluginReady);
599 }
600
601 protected void updateMemoryClusteredJob(
602 String jobName, String groupName, TriggerState triggerState) {
603
604 ObjectValuePair<SchedulerResponse, TriggerState>
605 memoryClusteredJob = _memoryClusteredJobs.get(
606 getFullName(jobName, groupName));
607
608 if (memoryClusteredJob != null) {
609 memoryClusteredJob.setValue(triggerState);
610 }
611 }
612
613 protected void updateMemoryClusteredJobs(
614 String groupName, TriggerState triggerState) {
615
616 for (ObjectValuePair<SchedulerResponse, TriggerState>
617 memoryClusteredJob : _memoryClusteredJobs.values()) {
618
619 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
620
621 if (groupName.equals(schedulerResponse.getGroupName())) {
622 memoryClusteredJob.setValue(triggerState);
623 }
624 }
625 }
626
627 private static void _addMemoryClusteredJob(
628 SchedulerResponse schedulerResponse, String beanIdentifier)
629 throws Exception {
630
631 Object serviceBean = PortalBeanLocatorUtil.locate(beanIdentifier);
632
633 AdvisedSupport advisedSupport = ServiceBeanAopProxy.getAdvisedSupport(
634 serviceBean);
635
636 TargetSource targetSource = advisedSupport.getTargetSource();
637
638 ClusterSchedulerEngine clusterSchedulerEngine =
639 (ClusterSchedulerEngine)targetSource.getTarget();
640
641 if (!clusterSchedulerEngine._portalReady) {
642 return;
643 }
644
645 String jobName = schedulerResponse.getJobName();
646 String groupName = schedulerResponse.getGroupName();
647
648 java.util.concurrent.locks.Lock readLock =
649 clusterSchedulerEngine._readLock;
650
651 readLock.lock();
652
653 try {
654 Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
655 memoryClusteredJobs =
656 clusterSchedulerEngine._memoryClusteredJobs;
657
658 memoryClusteredJobs.put(
659 clusterSchedulerEngine.getFullName(jobName, groupName),
660 new ObjectValuePair<SchedulerResponse, TriggerState>(
661 schedulerResponse, TriggerState.NORMAL));
662
663 if (_log.isInfoEnabled()) {
664 _log.info(
665 "Receive notification from master, add memory clustered " +
666 "job " + schedulerResponse);
667 }
668 }
669 finally {
670 readLock.unlock();
671 }
672 }
673
674 private void _notifySlave(
675 Trigger trigger, String description, String destinationName,
676 Message message) {
677
678 String groupName = trigger.getGroupName();
679 String jobName = trigger.getJobName();
680
681 SchedulerResponse schedulerResponse = new SchedulerResponse();
682
683 schedulerResponse.setDescription(description);
684 schedulerResponse.setDestinationName(destinationName);
685 schedulerResponse.setGroupName(groupName);
686 schedulerResponse.setJobName(jobName);
687 schedulerResponse.setTrigger(trigger);
688 schedulerResponse.setMessage(message);
689
690 try {
691 MethodHandler methodHandler = new MethodHandler(
692 _addMemoryClusteredJobMethodKey, schedulerResponse,
693 _beanIdentifier);
694
695 ClusterRequest clusterRequest =
696 ClusterRequest.createMulticastRequest(methodHandler, true);
697
698 clusterRequest.setFireAndForget(true);
699
700 ClusterExecutorUtil.execute(clusterRequest);
701 }
702 catch (Throwable t) {
703 _log.error("Unable to notify slave", t);
704 }
705 }
706
707 @BeanReference(
708 name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
709 protected SchedulerEngine schedulerEngine;
710
711 private static final String _PLUGIN_READY = "plugin.ready";
712
713 private static final String _PORTAL_READY = "portal.ready";
714
715 private static Log _log = LogFactoryUtil.getLog(
716 ClusterSchedulerEngine.class);
717
718 private static final MethodKey _addMemoryClusteredJobMethodKey =
719 new MethodKey(
720 ClusterSchedulerEngine.class, "_addMemoryClusteredJob",
721 SchedulerResponse.class, String.class);
722 private static final MethodKey _getScheduledJobMethodKey = new MethodKey(
723 SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
724 String.class, StorageType.class);
725 private static final MethodKey _getScheduledJobsMethodKey = new MethodKey(
726 SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
727
728 private String _beanIdentifier;
729 private final Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
730 _memoryClusteredJobs = new ConcurrentHashMap
731 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
732 private boolean _portalReady;
733 private final java.util.concurrent.locks.Lock _readLock;
734 private ClusterMasterTokenTransitionListener
735 _schedulerClusterMasterTokenTransitionListener;
736 private final SchedulerEngine _schedulerEngine;
737 private final java.util.concurrent.locks.Lock _writeLock;
738
739 private static class SchedulerClusterInvokeAcceptor
740 implements ClusterInvokeAcceptor {
741
742 @Override
743 public boolean accept(Map<String, Serializable> context) {
744 if (ClusterInvokeThreadLocal.isEnabled()) {
745 return false;
746 }
747
748 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
749 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
750 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
751
752 if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
753 !pluginReady) {
754
755 return false;
756 }
757
758 return true;
759 }
760
761 }
762
763 private class SchedulerClusterMasterTokenTransitionListener
764 extends BaseClusterMasterTokenTransitionListener {
765
766 @Override
767 protected void doMasterTokenAcquired() throws Exception {
768 boolean forceSync = ProxyModeThreadLocal.isForceSync();
769
770 ProxyModeThreadLocal.setForceSync(true);
771
772 _writeLock.lock();
773
774 try {
775 for (ObjectValuePair<SchedulerResponse, TriggerState>
776 memoryClusteredJob : _memoryClusteredJobs.values()) {
777
778 SchedulerResponse schedulerResponse =
779 memoryClusteredJob.getKey();
780
781 Trigger oldTrigger = schedulerResponse.getTrigger();
782
783 Date startDate = QuartzTriggerHelperUtil.getFireTimeAfter(
784 oldTrigger, new Date());
785
786 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
787 oldTrigger.getTriggerType(), oldTrigger.getJobName(),
788 oldTrigger.getGroupName(), startDate,
789 oldTrigger.getEndDate(),
790 oldTrigger.getTriggerContent());
791
792 _schedulerEngine.schedule(
793 newTrigger, schedulerResponse.getDescription(),
794 schedulerResponse.getDestinationName(),
795 schedulerResponse.getMessage());
796
797 TriggerState triggerState = memoryClusteredJob.getValue();
798
799 if (triggerState.equals(TriggerState.PAUSED)) {
800 _schedulerEngine.pause(
801 schedulerResponse.getJobName(),
802 schedulerResponse.getGroupName());
803 }
804 }
805
806 _memoryClusteredJobs.clear();
807
808 if (_log.isInfoEnabled()) {
809 _log.info("MEMORY_CLUSTERED jobs are running on this node");
810 }
811 }
812 finally {
813 ProxyModeThreadLocal.setForceSync(forceSync);
814
815 _writeLock.unlock();
816 }
817 }
818
819 @Override
820 protected void doMasterTokenReleased() throws Exception {
821 _writeLock.lock();
822
823 try {
824 for (SchedulerResponse schedulerResponse :
825 _schedulerEngine.getScheduledJobs()) {
826
827 if (StorageType.MEMORY_CLUSTERED ==
828 schedulerResponse.getStorageType()) {
829
830 String groupName =
831 SchedulerEngineHelperUtil.namespaceGroupName(
832 schedulerResponse.getGroupName(),
833 StorageType.MEMORY_CLUSTERED);
834
835 _schedulerEngine.delete(
836 schedulerResponse.getJobName(), groupName);
837 }
838 }
839
840 initMemoryClusteredJobs();
841
842 if (_log.isInfoEnabled()) {
843 _log.info(
844 "MEMORY_CLUSTERED jobs stop running on this node");
845 }
846 }
847 finally {
848 _writeLock.unlock();
849 }
850 }
851
852 }
853
854 }