001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.IdentifiableBean;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterEvent;
021 import com.liferay.portal.kernel.cluster.ClusterEventListener;
022 import com.liferay.portal.kernel.cluster.ClusterEventType;
023 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.Clusterable;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.messaging.Message;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037 import com.liferay.portal.kernel.scheduler.SchedulerException;
038 import com.liferay.portal.kernel.scheduler.StorageType;
039 import com.liferay.portal.kernel.scheduler.Trigger;
040 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041 import com.liferay.portal.kernel.scheduler.TriggerState;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.Base64;
044 import com.liferay.portal.kernel.util.CharPool;
045 import com.liferay.portal.kernel.util.MethodHandler;
046 import com.liferay.portal.kernel.util.MethodKey;
047 import com.liferay.portal.kernel.util.ObjectValuePair;
048 import com.liferay.portal.kernel.util.StringPool;
049 import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
050 import com.liferay.portal.model.Lock;
051 import com.liferay.portal.service.LockLocalServiceUtil;
052 import com.liferay.portal.util.PropsValues;
053
054 import java.io.ObjectInputStream;
055 import java.io.ObjectOutputStream;
056
057 import java.util.Collections;
058 import java.util.Iterator;
059 import java.util.List;
060 import java.util.Map;
061 import java.util.Set;
062 import java.util.concurrent.ConcurrentHashMap;
063 import java.util.concurrent.TimeUnit;
064 import java.util.concurrent.locks.ReadWriteLock;
065 import java.util.concurrent.locks.ReentrantReadWriteLock;
066
067
070 public class ClusterSchedulerEngine
071 implements IdentifiableBean, SchedulerEngine,
072 SchedulerEngineClusterManager {
073
074 public static SchedulerEngine createClusterSchedulerEngine(
075 SchedulerEngine schedulerEngine) {
076
077 if (PropsValues.CLUSTER_LINK_ENABLED) {
078 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
079 }
080
081 return schedulerEngine;
082 }
083
084 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
085 _schedulerEngine = schedulerEngine;
086 }
087
088 @Clusterable
089 public void delete(String groupName) throws SchedulerException {
090 if (!PropsValues.SCHEDULER_ENABLED) {
091 return;
092 }
093
094 try {
095 if (isMemorySchedulerSlave(groupName)) {
096 removeMemoryClusteredJobs(groupName);
097
098 return;
099 }
100 }
101 catch (Exception e) {
102 throw new SchedulerException(
103 "Unable to delete jobs in group " + groupName, e);
104 }
105
106 _readLock.lock();
107
108 try {
109 _schedulerEngine.delete(groupName);
110 }
111 finally {
112 _readLock.unlock();
113 }
114
115 skipClusterInvoking(groupName);
116 }
117
118 @Clusterable
119 public void delete(String jobName, String groupName)
120 throws SchedulerException {
121
122 if (!PropsValues.SCHEDULER_ENABLED) {
123 return;
124 }
125
126 try {
127 if (isMemorySchedulerSlave(groupName)) {
128 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
129
130 return;
131 }
132 }
133 catch (Exception e) {
134 throw new SchedulerException(
135 "Unable to delete job {jobName=" + jobName + ", groupName=" +
136 groupName + "}",
137 e);
138 }
139
140 _readLock.lock();
141
142 try {
143 _schedulerEngine.delete(jobName, groupName);
144 }
145 finally {
146 _readLock.unlock();
147 }
148
149 skipClusterInvoking(groupName);
150 }
151
152 public String getBeanIdentifier() {
153 return _beanIdentifier;
154 }
155
156 public SchedulerResponse getScheduledJob(String jobName, String groupName)
157 throws SchedulerException {
158
159 if (!PropsValues.SCHEDULER_ENABLED) {
160 return null;
161 }
162
163 try {
164 if (isMemorySchedulerSlave(groupName)) {
165 return (SchedulerResponse)callMaster(
166 _getScheduledJobMethodKey, jobName, groupName);
167 }
168 }
169 catch (Exception e) {
170 throw new SchedulerException(
171 "Unable to get job {jobName=" + jobName + ", groupName=" +
172 groupName + "}",
173 e);
174 }
175
176 _readLock.lock();
177
178 try {
179 return _schedulerEngine.getScheduledJob(jobName, groupName);
180 }
181 finally {
182 _readLock.unlock();
183 }
184 }
185
186 public List<SchedulerResponse> getScheduledJobs()
187 throws SchedulerException {
188
189 if (!PropsValues.SCHEDULER_ENABLED) {
190 return Collections.emptyList();
191 }
192
193 try {
194 if (isMemorySchedulerSlave()) {
195 return (List<SchedulerResponse>)callMaster(
196 _getScheduledJobsMethodKey1);
197 }
198 }
199 catch (Exception e) {
200 throw new SchedulerException("Unable to get jobs", e);
201 }
202
203 _readLock.lock();
204
205 try {
206 return _schedulerEngine.getScheduledJobs();
207 }
208 finally {
209 _readLock.unlock();
210 }
211 }
212
213 public List<SchedulerResponse> getScheduledJobs(String groupName)
214 throws SchedulerException {
215
216 if (!PropsValues.SCHEDULER_ENABLED) {
217 return Collections.emptyList();
218 }
219
220 try {
221 if (isMemorySchedulerSlave(groupName)) {
222 return (List<SchedulerResponse>)callMaster(
223 _getScheduledJobsMethodKey2, groupName);
224 }
225 }
226 catch (Exception e) {
227 throw new SchedulerException(
228 "Unable to get jobs in group " + groupName, e);
229 }
230
231 _readLock.lock();
232
233 try {
234 return _schedulerEngine.getScheduledJobs(groupName);
235 }
236 finally {
237 _readLock.unlock();
238 }
239 }
240
241 @Clusterable
242 public void pause(String groupName) throws SchedulerException {
243 if (!PropsValues.SCHEDULER_ENABLED) {
244 return;
245 }
246
247 try {
248 if (isMemorySchedulerSlave(groupName)) {
249 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
250
251 return;
252 }
253 }
254 catch (Exception e) {
255 throw new SchedulerException(
256 "Unable to pause jobs in group " + groupName, e);
257 }
258
259 _readLock.lock();
260
261 try {
262 _schedulerEngine.pause(groupName);
263 }
264 finally {
265 _readLock.unlock();
266 }
267
268 skipClusterInvoking(groupName);
269 }
270
271 @Clusterable
272 public void pause(String jobName, String groupName)
273 throws SchedulerException {
274
275 if (!PropsValues.SCHEDULER_ENABLED) {
276 return;
277 }
278
279 try {
280 if (isMemorySchedulerSlave(groupName)) {
281 updateMemoryClusteredJob(
282 jobName, groupName, TriggerState.PAUSED);
283
284 return;
285 }
286 }
287 catch (Exception e) {
288 throw new SchedulerException(
289 "Unable to pause job {jobName=" + jobName + ", groupName=" +
290 groupName + "}",
291 e);
292 }
293
294 _readLock.lock();
295
296 try {
297 _schedulerEngine.pause(jobName, groupName);
298 }
299 finally {
300 _readLock.unlock();
301 }
302
303 skipClusterInvoking(groupName);
304 }
305
306 @Clusterable
307 public void resume(String groupName) throws SchedulerException {
308 if (!PropsValues.SCHEDULER_ENABLED) {
309 return;
310 }
311
312 try {
313 if (isMemorySchedulerSlave(groupName)) {
314 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
315
316 return;
317 }
318 }
319 catch (Exception e) {
320 throw new SchedulerException(
321 "Unable to resume jobs in group " + groupName, e);
322 }
323
324 _readLock.lock();
325
326 try {
327 _schedulerEngine.resume(groupName);
328 }
329 finally {
330 _readLock.unlock();
331 }
332
333 skipClusterInvoking(groupName);
334 }
335
336 @Clusterable
337 public void resume(String jobName, String groupName)
338 throws SchedulerException {
339
340 if (!PropsValues.SCHEDULER_ENABLED) {
341 return;
342 }
343
344 try {
345 if (isMemorySchedulerSlave(groupName)) {
346 updateMemoryClusteredJob(
347 jobName, groupName, TriggerState.NORMAL);
348
349 return;
350 }
351 }
352 catch (Exception e) {
353 throw new SchedulerException(
354 "Unable to resume job {jobName=" + jobName + ", groupName=" +
355 groupName + "}",
356 e);
357 }
358
359 _readLock.lock();
360
361 try {
362 _schedulerEngine.resume(jobName, groupName);
363 }
364 finally {
365 _readLock.unlock();
366 }
367
368 skipClusterInvoking(groupName);
369 }
370
371 @Clusterable
372 public void schedule(
373 Trigger trigger, String description, String destinationName,
374 Message message)
375 throws SchedulerException {
376
377 if (!PropsValues.SCHEDULER_ENABLED) {
378 return;
379 }
380
381 String groupName = trigger.getGroupName();
382 String jobName = trigger.getJobName();
383
384 try {
385 if (isMemorySchedulerSlave(groupName)) {
386 SchedulerResponse schedulerResponse = new SchedulerResponse();
387
388 schedulerResponse.setDescription(description);
389 schedulerResponse.setDestinationName(destinationName);
390 schedulerResponse.setGroupName(groupName);
391 schedulerResponse.setJobName(jobName);
392 schedulerResponse.setMessage(message);
393 schedulerResponse.setTrigger(trigger);
394
395 _memoryClusteredJobs.put(
396 getFullName(jobName, groupName),
397 new ObjectValuePair<SchedulerResponse, TriggerState>(
398 schedulerResponse, TriggerState.NORMAL));
399
400 return;
401 }
402 }
403 catch (Exception e) {
404 throw new SchedulerException(
405 "Unable to schedule job {jobName=" + jobName + ", groupName=" +
406 groupName + "}",
407 e);
408 }
409
410 _readLock.lock();
411
412 try {
413 _schedulerEngine.schedule(
414 trigger, description, destinationName, message);
415 }
416 finally {
417 _readLock.unlock();
418 }
419
420 skipClusterInvoking(groupName);
421 }
422
423 public void setBeanIdentifier(String beanIdentifier) {
424 _beanIdentifier = beanIdentifier;
425 }
426
427 public void shutdown() throws SchedulerException {
428 if (!PropsValues.SCHEDULER_ENABLED) {
429 return;
430 }
431
432 try {
433 ClusterExecutorUtil.removeClusterEventListener(
434 _clusterEventListener);
435
436 LockLocalServiceUtil.unlock(
437 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
438 PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
439 }
440 catch (Exception e) {
441 throw new SchedulerException("Unable to shutdown scheduler", e);
442 }
443
444 _schedulerEngine.shutdown();
445 }
446
447 public void start() throws SchedulerException {
448 if (!PropsValues.SCHEDULER_ENABLED) {
449 return;
450 }
451
452 try {
453 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
454
455 _readLock = readWriteLock.readLock();
456 _writeLock = readWriteLock.writeLock();
457
458 _localClusterNodeAddress = getSerializedString(
459 ClusterExecutorUtil.getLocalClusterNodeAddress());
460
461 _clusterEventListener = new MemorySchedulerClusterEventListener();
462
463 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
464
465 if (!isMemorySchedulerClusterLockOwner(
466 lockMemorySchedulerCluster(null))) {
467
468 initMemoryClusteredJobs();
469 }
470 }
471 catch (Exception e) {
472 throw new SchedulerException("Unable to start scheduler", e);
473 }
474
475 _schedulerEngine.start();
476 }
477
478 @Clusterable
479 public void suppressError(String jobName, String groupName)
480 throws SchedulerException {
481
482 if (!PropsValues.SCHEDULER_ENABLED) {
483 return;
484 }
485
486 try {
487 if (isMemorySchedulerSlave(groupName)) {
488 return;
489 }
490 }
491 catch (Exception e) {
492 throw new SchedulerException(
493 "Unable to suppress error for job {jobName=" + jobName +
494 ", groupName=" + groupName + "}",
495 e);
496 }
497
498 _readLock.lock();
499
500 try {
501 _schedulerEngine.suppressError(jobName, groupName);
502 }
503 finally {
504 _readLock.unlock();
505 }
506
507 skipClusterInvoking(groupName);
508 }
509
510 @Clusterable
511 public void unschedule(String groupName) throws SchedulerException {
512 if (!PropsValues.SCHEDULER_ENABLED) {
513 return;
514 }
515
516 try {
517 if (isMemorySchedulerSlave(groupName)) {
518 removeMemoryClusteredJobs(groupName);
519
520 return;
521 }
522 }
523 catch (Exception e) {
524 throw new SchedulerException(
525 "Unable to unschedule jobs in group " + groupName, e);
526 }
527
528 _readLock.lock();
529
530 try {
531 _schedulerEngine.unschedule(groupName);
532 }
533 finally {
534 _readLock.unlock();
535 }
536
537 skipClusterInvoking(groupName);
538 }
539
540 @Clusterable
541 public void unschedule(String jobName, String groupName)
542 throws SchedulerException {
543
544 if (!PropsValues.SCHEDULER_ENABLED) {
545 return;
546 }
547
548 try {
549 if (isMemorySchedulerSlave(groupName)) {
550 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
551
552 return;
553 }
554 }
555 catch (Exception e) {
556 throw new SchedulerException(
557 "Unable to unschedule job {jobName=" + jobName +
558 ", groupName=" + groupName + "}",
559 e);
560 }
561
562 _readLock.lock();
563
564 try {
565 _schedulerEngine.unschedule(jobName, groupName);
566 }
567 finally {
568 _readLock.unlock();
569 }
570
571 skipClusterInvoking(groupName);
572 }
573
574 @Clusterable
575 public void update(Trigger trigger) throws SchedulerException {
576 if (!PropsValues.SCHEDULER_ENABLED) {
577 return;
578 }
579
580 String jobName = trigger.getJobName();
581 String groupName = trigger.getGroupName();
582
583 try {
584 if (isMemorySchedulerSlave(groupName)) {
585 for (ObjectValuePair<SchedulerResponse, TriggerState>
586 memoryClusteredJob : _memoryClusteredJobs.values()) {
587
588 SchedulerResponse schedulerResponse =
589 memoryClusteredJob.getKey();
590
591 if (jobName.equals(schedulerResponse.getJobName()) &&
592 groupName.equals(schedulerResponse.getGroupName())) {
593
594 schedulerResponse.setTrigger(trigger);
595
596 return;
597 }
598 }
599
600 throw new Exception(
601 "Unable to update trigger for memory clustered job");
602 }
603 }
604 catch (Exception e) {
605 throw new SchedulerException(
606 "Unable to update job {jobName=" + jobName + ", groupName=" +
607 groupName + "}",
608 e);
609 }
610
611 _readLock.lock();
612
613 try {
614 _schedulerEngine.update(trigger);
615 }
616 finally {
617 _readLock.unlock();
618 }
619
620 skipClusterInvoking(groupName);
621 }
622
623 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
624 try {
625 Lock lock = lockMemorySchedulerCluster(null);
626
627 Address address = (Address)getDeserializedObject(lock.getOwner());
628
629 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
630 return lock;
631 }
632
633 return lockMemorySchedulerCluster(lock.getOwner());
634 }
635 catch (Exception e) {
636 throw new SchedulerException(
637 "Unable to update memory scheduler cluster master", e);
638 }
639 }
640
641 protected Object callMaster(MethodKey methodKey, Object... arguments)
642 throws Exception {
643
644 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
645
646 Lock lock = updateMemorySchedulerClusterMaster();
647
648 Address address = (Address)getDeserializedObject(lock.getOwner());
649
650 if (address.equals(ClusterExecutorUtil.getLocalClusterNodeAddress())) {
651 if (methodKey == _getScheduledJobsMethodKey3) {
652 return methodHandler.invoke(false);
653 }
654 else {
655 return methodHandler.invoke(schedulerEngine);
656 }
657 }
658
659 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
660 methodHandler, address);
661
662 clusterRequest.setBeanIdentifier(_beanIdentifier);
663
664 FutureClusterResponses futureClusterResponses =
665 ClusterExecutorUtil.execute(clusterRequest);
666
667 try {
668 ClusterNodeResponses clusterNodeResponses =
669 futureClusterResponses.get(20, TimeUnit.SECONDS);
670
671 ClusterNodeResponse clusterNodeResponse =
672 clusterNodeResponses.getClusterResponse(address);
673
674 return clusterNodeResponse.getResult();
675 }
676 catch (Exception e) {
677 throw new SchedulerException(
678 "Unable to load scheduled jobs from cluster node " +
679 address.getDescription(),
680 e);
681 }
682 }
683
684 protected Object getDeserializedObject(String string) throws Exception {
685 byte[] bytes = Base64.decode(string);
686
687 UnsyncByteArrayInputStream byteArrayInputStream =
688 new UnsyncByteArrayInputStream(bytes);
689
690 ObjectInputStream objectInputStream = new ObjectInputStream(
691 byteArrayInputStream);
692
693 Object object = objectInputStream.readObject();
694
695 objectInputStream.close();
696
697 return object;
698 }
699
700 protected String getFullName(String jobName, String groupName) {
701 return groupName.concat(StringPool.PERIOD).concat(jobName);
702 }
703
704 protected String getSerializedString(Object object) throws Exception {
705 UnsyncByteArrayOutputStream byteArrayOutputStream =
706 new UnsyncByteArrayOutputStream();
707
708 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
709 byteArrayOutputStream);
710
711 objectOutputStream.writeObject(object);
712 objectOutputStream.close();
713
714 byte[] bytes = byteArrayOutputStream.toByteArray();
715
716 return Base64.encode(bytes);
717 }
718
719 protected StorageType getStorageType(String groupName) {
720 int pos = groupName.indexOf(CharPool.POUND);
721
722 String storageTypeString = groupName.substring(0, pos);
723
724 return StorageType.valueOf(storageTypeString);
725 }
726
727 protected void initMemoryClusteredJobs() throws Exception {
728 List<SchedulerResponse> schedulerResponses =
729 (List<SchedulerResponse>)callMaster(
730 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
731
732 for (SchedulerResponse schedulerResponse : schedulerResponses) {
733 Trigger oldTrigger = schedulerResponse.getTrigger();
734
735 String jobName = schedulerResponse.getJobName();
736 String groupName = SchedulerEngineUtil.namespaceGroupName(
737 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
738
739 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
740 oldTrigger.getTriggerType(), jobName, groupName,
741 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
742 oldTrigger.getTriggerContent());
743
744 schedulerResponse.setTrigger(newTrigger);
745
746 TriggerState triggerState = SchedulerEngineUtil.getJobState(
747 schedulerResponse);
748
749 Message message = schedulerResponse.getMessage();
750
751 message.remove(JOB_STATE);
752
753 _memoryClusteredJobs.put(
754 getFullName(jobName, groupName),
755 new ObjectValuePair<SchedulerResponse, TriggerState>(
756 schedulerResponse, triggerState));
757 }
758 }
759
760 protected boolean isMemorySchedulerClusterLockOwner(Lock lock)
761 throws Exception {
762
763 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
764
765 if (master == _master) {
766 return master;
767 }
768
769 if (!_master) {
770 _master = master;
771
772 return _master;
773 }
774
775 _localClusterNodeAddress = getSerializedString(
776 ClusterExecutorUtil.getLocalClusterNodeAddress());
777
778 for (ObjectValuePair<SchedulerResponse, TriggerState>
779 memoryClusteredJob : _memoryClusteredJobs.values()) {
780
781 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
782
783 _schedulerEngine.delete(
784 schedulerResponse.getJobName(),
785 schedulerResponse.getGroupName());
786 }
787
788 initMemoryClusteredJobs();
789
790 if (_log.isInfoEnabled()) {
791 _log.info("Another node is now the memory scheduler master");
792 }
793
794 _master = master;
795
796 return master;
797 }
798
799 protected boolean isMemorySchedulerSlave() throws Exception {
800 return isMemorySchedulerSlave(null);
801 }
802
803 protected boolean isMemorySchedulerSlave(String groupName)
804 throws Exception {
805
806 if (groupName != null) {
807 StorageType storageType = getStorageType(groupName);
808
809 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
810 return false;
811 }
812 }
813
814 Lock lock = lockMemorySchedulerCluster(null);
815
816 if (isMemorySchedulerClusterLockOwner(lock)) {
817 return false;
818 }
819
820 return true;
821 }
822
823 protected Lock lockMemorySchedulerCluster(String owner) throws Exception {
824 Lock lock = null;
825
826 while (true) {
827 try {
828 if (owner == null) {
829 lock = LockLocalServiceUtil.lock(
830 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
831 _localClusterNodeAddress,
832 PropsValues.
833 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
834 }
835 else {
836 lock = LockLocalServiceUtil.lock(
837 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
838 _localClusterNodeAddress,
839 PropsValues.
840 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
841 }
842
843 break;
844 }
845 catch (Exception e) {
846 if (_log.isWarnEnabled()) {
847 _log.warn(
848 "Unable to obtain memory scheduler cluster lock. " +
849 "Trying again.");
850 }
851 }
852 }
853
854 if (!lock.isNew()) {
855 return lock;
856 }
857
858 boolean forceSync = ProxyModeThreadLocal.isForceSync();
859
860 ProxyModeThreadLocal.setForceSync(true);
861
862 _writeLock.lock();
863
864 try {
865 for (ObjectValuePair<SchedulerResponse, TriggerState>
866 memoryClusteredJob : _memoryClusteredJobs.values()) {
867
868 SchedulerResponse schedulerResponse =
869 memoryClusteredJob.getKey();
870
871 _schedulerEngine.schedule(
872 schedulerResponse.getTrigger(),
873 schedulerResponse.getDescription(),
874 schedulerResponse.getDestinationName(),
875 schedulerResponse.getMessage());
876
877 TriggerState triggerState = memoryClusteredJob.getValue();
878
879 if (triggerState.equals(TriggerState.PAUSED)) {
880 _schedulerEngine.pause(
881 schedulerResponse.getJobName(),
882 schedulerResponse.getGroupName());
883 }
884 }
885 }
886 finally {
887 ProxyModeThreadLocal.setForceSync(forceSync);
888
889 _writeLock.unlock();
890 }
891
892 return lock;
893 }
894
895 protected void removeMemoryClusteredJobs(String groupName) {
896 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
897 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
898
899 Iterator
900 <Map.Entry<String,
901 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
902 memoryClusteredJobs.iterator();
903
904 while (itr.hasNext()) {
905 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
906 entry = itr.next();
907
908 ObjectValuePair<SchedulerResponse, TriggerState>
909 memoryClusteredJob = entry.getValue();
910
911 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
912
913 if (groupName.equals(schedulerResponse.getGroupName())) {
914 itr.remove();
915 }
916 }
917 }
918
919 protected void skipClusterInvoking(String groupName)
920 throws SchedulerException {
921
922 StorageType storageType = getStorageType(groupName);
923
924 if (storageType.equals(StorageType.PERSISTED)) {
925 SchedulerException schedulerException = new SchedulerException();
926
927 schedulerException.setSwallowable(true);
928
929 throw schedulerException;
930 }
931 }
932
933 protected void updateMemoryClusteredJob(
934 String jobName, String groupName, TriggerState triggerState) {
935
936 ObjectValuePair<SchedulerResponse, TriggerState>
937 memoryClusteredJob = _memoryClusteredJobs.get(
938 getFullName(jobName, groupName));
939
940 if (memoryClusteredJob != null) {
941 memoryClusteredJob.setValue(triggerState);
942 }
943 }
944
945 protected void updateMemoryClusteredJobs(
946 String groupName, TriggerState triggerState) {
947
948 for (ObjectValuePair<SchedulerResponse, TriggerState>
949 memoryClusteredJob : _memoryClusteredJobs.values()) {
950
951 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
952
953 if (groupName.equals(schedulerResponse.getGroupName())) {
954 memoryClusteredJob.setValue(triggerState);
955 }
956 }
957 }
958
959 @BeanReference(
960 name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
961 protected SchedulerEngine schedulerEngine;
962
963 private static final String _LOCK_CLASS_NAME =
964 SchedulerEngine.class.getName();
965
966 private static Log _log = LogFactoryUtil.getLog(
967 ClusterSchedulerEngine.class);
968
969 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
970 SchedulerEngine.class.getName(), "getScheduledJob", String.class,
971 String.class);
972 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
973 SchedulerEngine.class.getName(), "getScheduledJobs");
974 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
975 SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
976 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
977 SchedulerEngineUtil.class.getName(), "getScheduledJobs",
978 StorageType.class);
979
980 private String _beanIdentifier;
981 private ClusterEventListener _clusterEventListener;
982 private volatile String _localClusterNodeAddress;
983 private volatile boolean _master;
984 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
985 _memoryClusteredJobs = new ConcurrentHashMap
986 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
987 private java.util.concurrent.locks.Lock _readLock;
988 private SchedulerEngine _schedulerEngine;
989 private java.util.concurrent.locks.Lock _writeLock;
990
991 private class MemorySchedulerClusterEventListener
992 implements ClusterEventListener {
993
994 public void processClusterEvent(ClusterEvent clusterEvent) {
995 ClusterEventType clusterEventType =
996 clusterEvent.getClusterEventType();
997
998 if (!clusterEventType.equals(ClusterEventType.DEPART)) {
999 return;
1000 }
1001
1002 try {
1003 updateMemorySchedulerClusterMaster();
1004 }
1005 catch (Exception e) {
1006 _log.error("Unable to update memory scheduler cluster lock", e);
1007 }
1008 }
1009
1010 }
1011
1012 }