001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.IdentifiableBean;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterEvent;
021 import com.liferay.portal.kernel.cluster.ClusterEventListener;
022 import com.liferay.portal.kernel.cluster.ClusterEventType;
023 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.Clusterable;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.messaging.Message;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037 import com.liferay.portal.kernel.scheduler.SchedulerException;
038 import com.liferay.portal.kernel.scheduler.StorageType;
039 import com.liferay.portal.kernel.scheduler.Trigger;
040 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041 import com.liferay.portal.kernel.scheduler.TriggerState;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.Base64;
044 import com.liferay.portal.kernel.util.CharPool;
045 import com.liferay.portal.kernel.util.MethodHandler;
046 import com.liferay.portal.kernel.util.MethodKey;
047 import com.liferay.portal.kernel.util.ObjectValuePair;
048 import com.liferay.portal.kernel.util.StringPool;
049 import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
050 import com.liferay.portal.model.Lock;
051 import com.liferay.portal.service.LockLocalServiceUtil;
052 import com.liferay.portal.util.PropsValues;
053
054 import java.io.ObjectInputStream;
055 import java.io.ObjectOutputStream;
056
057 import java.util.Iterator;
058 import java.util.List;
059 import java.util.Map;
060 import java.util.Set;
061 import java.util.concurrent.ConcurrentHashMap;
062 import java.util.concurrent.TimeUnit;
063 import java.util.concurrent.locks.ReadWriteLock;
064 import java.util.concurrent.locks.ReentrantReadWriteLock;
065
066
069 public class ClusterSchedulerEngine
070 implements IdentifiableBean, SchedulerEngine,
071 SchedulerEngineClusterManager {
072
073 public static SchedulerEngine createClusterSchedulerEngine(
074 SchedulerEngine schedulerEngine) {
075
076 if (PropsValues.CLUSTER_LINK_ENABLED) {
077 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
078 }
079
080 return schedulerEngine;
081 }
082
083 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
084 _schedulerEngine = schedulerEngine;
085 }
086
087 @Clusterable
088 public void delete(String groupName) throws SchedulerException {
089 try {
090 if (isMemorySchedulerSlave(groupName)) {
091 removeMemoryClusteredJobs(groupName);
092
093 return;
094 }
095 }
096 catch (Exception e) {
097 throw new SchedulerException(
098 "Unable to delete jobs in group " + groupName, e);
099 }
100
101 _readLock.lock();
102
103 try {
104 _schedulerEngine.delete(groupName);
105 }
106 finally {
107 _readLock.unlock();
108 }
109
110 skipClusterInvoking(groupName);
111 }
112
113 @Clusterable
114 public void delete(String jobName, String groupName)
115 throws SchedulerException {
116
117 try {
118 if (isMemorySchedulerSlave(groupName)) {
119 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
120
121 return;
122 }
123 }
124 catch (Exception e) {
125 throw new SchedulerException(
126 "Unable to delete job {jobName=" + jobName + ", groupName=" +
127 groupName + "}",
128 e);
129 }
130
131 _readLock.lock();
132
133 try {
134 _schedulerEngine.delete(jobName, groupName);
135 }
136 finally {
137 _readLock.unlock();
138 }
139
140 skipClusterInvoking(groupName);
141 }
142
143 public String getBeanIdentifier() {
144 return _beanIdentifier;
145 }
146
147 public SchedulerResponse getScheduledJob(String jobName, String groupName)
148 throws SchedulerException {
149
150 try {
151 if (isMemorySchedulerSlave(groupName)) {
152 return (SchedulerResponse)callMaster(
153 _getScheduledJobMethodKey, jobName, groupName);
154 }
155 }
156 catch (Exception e) {
157 throw new SchedulerException(
158 "Unable to get job {jobName=" + jobName + ", groupName=" +
159 groupName + "}",
160 e);
161 }
162
163 _readLock.lock();
164
165 try {
166 return _schedulerEngine.getScheduledJob(jobName, groupName);
167 }
168 finally {
169 _readLock.unlock();
170 }
171 }
172
173 public List<SchedulerResponse> getScheduledJobs()
174 throws SchedulerException {
175
176 try {
177 if (isMemorySchedulerSlave()) {
178 return (List<SchedulerResponse>)callMaster(
179 _getScheduledJobsMethodKey1);
180 }
181 }
182 catch (Exception e) {
183 throw new SchedulerException("Unable to get jobs", e);
184 }
185
186 _readLock.lock();
187
188 try {
189 return _schedulerEngine.getScheduledJobs();
190 }
191 finally {
192 _readLock.unlock();
193 }
194 }
195
196 public List<SchedulerResponse> getScheduledJobs(String groupName)
197 throws SchedulerException {
198
199 try {
200 if (isMemorySchedulerSlave(groupName)) {
201 return (List<SchedulerResponse>)callMaster(
202 _getScheduledJobsMethodKey2, groupName);
203 }
204 }
205 catch (Exception e) {
206 throw new SchedulerException(
207 "Unable to get jobs in group " + groupName, e);
208 }
209
210 _readLock.lock();
211
212 try {
213 return _schedulerEngine.getScheduledJobs(groupName);
214 }
215 finally {
216 _readLock.unlock();
217 }
218 }
219
220 @Clusterable
221 public void pause(String groupName) throws SchedulerException {
222 try {
223 if (isMemorySchedulerSlave(groupName)) {
224 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
225
226 return;
227 }
228 }
229 catch (Exception e) {
230 throw new SchedulerException(
231 "Unable to pause jobs in group " + groupName, e);
232 }
233
234 _readLock.lock();
235
236 try {
237 _schedulerEngine.pause(groupName);
238 }
239 finally {
240 _readLock.unlock();
241 }
242
243 skipClusterInvoking(groupName);
244 }
245
246 @Clusterable
247 public void pause(String jobName, String groupName)
248 throws SchedulerException {
249
250 try {
251 if (isMemorySchedulerSlave(groupName)) {
252 updateMemoryClusteredJob(
253 jobName, groupName, TriggerState.PAUSED);
254
255 return;
256 }
257 }
258 catch (Exception e) {
259 throw new SchedulerException(
260 "Unable to pause job {jobName=" + jobName + ", groupName=" +
261 groupName + "}",
262 e);
263 }
264
265 _readLock.lock();
266
267 try {
268 _schedulerEngine.pause(jobName, groupName);
269 }
270 finally {
271 _readLock.unlock();
272 }
273
274 skipClusterInvoking(groupName);
275 }
276
277 @Clusterable
278 public void resume(String groupName) throws SchedulerException {
279 try {
280 if (isMemorySchedulerSlave(groupName)) {
281 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
282
283 return;
284 }
285 }
286 catch (Exception e) {
287 throw new SchedulerException(
288 "Unable to resume jobs in group " + groupName, e);
289 }
290
291 _readLock.lock();
292
293 try {
294 _schedulerEngine.resume(groupName);
295 }
296 finally {
297 _readLock.unlock();
298 }
299
300 skipClusterInvoking(groupName);
301 }
302
303 @Clusterable
304 public void resume(String jobName, String groupName)
305 throws SchedulerException {
306
307 try {
308 if (isMemorySchedulerSlave(groupName)) {
309 updateMemoryClusteredJob(
310 jobName, groupName, TriggerState.NORMAL);
311
312 return;
313 }
314 }
315 catch (Exception e) {
316 throw new SchedulerException(
317 "Unable to resume job {jobName=" + jobName + ", groupName=" +
318 groupName + "}",
319 e);
320 }
321
322 _readLock.lock();
323
324 try {
325 _schedulerEngine.resume(jobName, groupName);
326 }
327 finally {
328 _readLock.unlock();
329 }
330
331 skipClusterInvoking(groupName);
332 }
333
334 @Clusterable
335 public void schedule(
336 Trigger trigger, String description, String destinationName,
337 Message message)
338 throws SchedulerException {
339
340 String groupName = trigger.getGroupName();
341 String jobName = trigger.getJobName();
342
343 try {
344 if (isMemorySchedulerSlave(groupName)) {
345 SchedulerResponse schedulerResponse = new SchedulerResponse();
346
347 schedulerResponse.setDescription(description);
348 schedulerResponse.setDestinationName(destinationName);
349 schedulerResponse.setGroupName(groupName);
350 schedulerResponse.setJobName(jobName);
351 schedulerResponse.setMessage(message);
352 schedulerResponse.setTrigger(trigger);
353
354 _memoryClusteredJobs.put(
355 getFullName(jobName, groupName),
356 new ObjectValuePair<SchedulerResponse, TriggerState>(
357 schedulerResponse, TriggerState.NORMAL));
358
359 return;
360 }
361 }
362 catch (Exception e) {
363 throw new SchedulerException(
364 "Unable to schedule job {jobName=" + jobName + ", groupName=" +
365 groupName + "}",
366 e);
367 }
368
369 _readLock.lock();
370
371 try {
372 _schedulerEngine.schedule(
373 trigger, description, destinationName, message);
374 }
375 finally {
376 _readLock.unlock();
377 }
378
379 skipClusterInvoking(groupName);
380 }
381
382 public void setBeanIdentifier(String beanIdentifier) {
383 _beanIdentifier = beanIdentifier;
384 }
385
386 public void shutdown() throws SchedulerException {
387 try {
388 ClusterExecutorUtil.removeClusterEventListener(
389 _clusterEventListener);
390
391 LockLocalServiceUtil.unlock(
392 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
393 PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
394 }
395 catch (Exception e) {
396 throw new SchedulerException("Unable to shutdown scheduler", e);
397 }
398
399 _schedulerEngine.shutdown();
400 }
401
402 public void start() throws SchedulerException {
403 try {
404 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
405
406 _readLock = readWriteLock.readLock();
407 _writeLock = readWriteLock.writeLock();
408
409 _localClusterNodeAddress = getSerializedString(
410 ClusterExecutorUtil.getLocalClusterNodeAddress());
411
412 _clusterEventListener = new MemorySchedulerClusterEventListener();
413
414 ClusterExecutorUtil.addClusterEventListener(
415 _clusterEventListener);
416
417 if (!isMemorySchedulerClusterLockOwner(
418 lockMemorySchedulerCluster(null))) {
419
420 initMemoryClusteredJobs();
421 }
422 }
423 catch (Exception e) {
424 throw new SchedulerException("Unable to start scheduler", e);
425 }
426
427 _schedulerEngine.start();
428 }
429
430 @Clusterable
431 public void suppressError(String jobName, String groupName)
432 throws SchedulerException {
433
434 try {
435 if (isMemorySchedulerSlave(groupName)) {
436 return;
437 }
438 }
439 catch (Exception e) {
440 throw new SchedulerException(
441 "Unable to suppress error for job {jobName=" + jobName +
442 ", groupName=" + groupName + "}",
443 e);
444 }
445
446 _readLock.lock();
447
448 try {
449 _schedulerEngine.suppressError(jobName, groupName);
450 }
451 finally {
452 _readLock.unlock();
453 }
454
455 skipClusterInvoking(groupName);
456 }
457
458 @Clusterable
459 public void unschedule(String groupName) throws SchedulerException {
460 try {
461 if (isMemorySchedulerSlave(groupName)) {
462 removeMemoryClusteredJobs(groupName);
463
464 return;
465 }
466 }
467 catch (Exception e) {
468 throw new SchedulerException(
469 "Unable to unschedule jobs in group " + groupName, e);
470 }
471
472 _readLock.lock();
473
474 try {
475 _schedulerEngine.unschedule(groupName);
476 }
477 finally {
478 _readLock.unlock();
479 }
480
481 skipClusterInvoking(groupName);
482 }
483
484 @Clusterable
485 public void unschedule(String jobName, String groupName)
486 throws SchedulerException {
487
488 try {
489 if (isMemorySchedulerSlave(groupName)) {
490 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
491
492 return;
493 }
494 }
495 catch (Exception e) {
496 throw new SchedulerException(
497 "Unable to unschedule job {jobName=" + jobName +
498 ", groupName=" + groupName + "}",
499 e);
500 }
501
502 _readLock.lock();
503
504 try {
505 _schedulerEngine.unschedule(jobName, groupName);
506 }
507 finally {
508 _readLock.unlock();
509 }
510
511 skipClusterInvoking(groupName);
512 }
513
514 @Clusterable
515 public void update(Trigger trigger) throws SchedulerException {
516 String jobName = trigger.getJobName();
517 String groupName = trigger.getGroupName();
518
519 try {
520 if (isMemorySchedulerSlave(groupName)) {
521 for (ObjectValuePair<SchedulerResponse, TriggerState>
522 memoryClusteredJob : _memoryClusteredJobs.values()) {
523
524 SchedulerResponse schedulerResponse =
525 memoryClusteredJob.getKey();
526
527 if (jobName.equals(schedulerResponse.getJobName()) &&
528 groupName.equals(schedulerResponse.getGroupName())) {
529
530 schedulerResponse.setTrigger(trigger);
531
532 return;
533 }
534 }
535
536 throw new Exception(
537 "Unable to update trigger for memory clustered job");
538 }
539 }
540 catch (Exception e) {
541 throw new SchedulerException(
542 "Unable to update job {jobName=" + jobName + ", groupName=" +
543 groupName + "}",
544 e);
545 }
546
547 _readLock.lock();
548
549 try {
550 _schedulerEngine.update(trigger);
551 }
552 finally {
553 _readLock.unlock();
554 }
555
556 skipClusterInvoking(groupName);
557 }
558
559 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
560 try {
561 Lock lock = lockMemorySchedulerCluster(null);
562
563 Address address = (Address)getDeserializedObject(lock.getOwner());
564
565 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
566 return lock;
567 }
568
569 return lockMemorySchedulerCluster(lock.getOwner());
570 }
571 catch (Exception e) {
572 throw new SchedulerException(
573 "Unable to update memory scheduler cluster master", e);
574 }
575 }
576
577 protected Object callMaster(MethodKey methodKey, Object... arguments)
578 throws Exception {
579
580 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
581
582 Lock lock = updateMemorySchedulerClusterMaster();
583
584 Address address = (Address)getDeserializedObject(lock.getOwner());
585
586 if (address.equals(ClusterExecutorUtil.getLocalClusterNodeAddress())) {
587 if (methodKey == _getScheduledJobsMethodKey3) {
588 return methodHandler.invoke(false);
589 }
590 else {
591 return methodHandler.invoke(schedulerEngine);
592 }
593 }
594
595 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
596 methodHandler, address);
597
598 clusterRequest.setBeanIdentifier(_beanIdentifier);
599
600 FutureClusterResponses futureClusterResponses =
601 ClusterExecutorUtil.execute(clusterRequest);
602
603 try {
604 ClusterNodeResponses clusterNodeResponses =
605 futureClusterResponses.get(20, TimeUnit.SECONDS);
606
607 ClusterNodeResponse clusterNodeResponse =
608 clusterNodeResponses.getClusterResponse(address);
609
610 return clusterNodeResponse.getResult();
611 }
612 catch (Exception e) {
613 throw new SchedulerException(
614 "Unable to load scheduled jobs from cluster node " +
615 address.getDescription(),
616 e);
617 }
618 }
619
620 protected Object getDeserializedObject(String string) throws Exception {
621 byte[] bytes = Base64.decode(string);
622
623 UnsyncByteArrayInputStream byteArrayInputStream =
624 new UnsyncByteArrayInputStream(bytes);
625
626 ObjectInputStream objectInputStream = new ObjectInputStream(
627 byteArrayInputStream);
628
629 Object object = objectInputStream.readObject();
630
631 objectInputStream.close();
632
633 return object;
634 }
635
636 protected String getFullName(String jobName, String groupName) {
637 return groupName.concat(StringPool.PERIOD).concat(jobName);
638 }
639
640 protected String getSerializedString(Object object) throws Exception {
641 UnsyncByteArrayOutputStream byteArrayOutputStream =
642 new UnsyncByteArrayOutputStream();
643
644 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
645 byteArrayOutputStream);
646
647 objectOutputStream.writeObject(object);
648 objectOutputStream.close();
649
650 byte[] bytes = byteArrayOutputStream.toByteArray();
651
652 return Base64.encode(bytes);
653 }
654
655 protected StorageType getStorageType(String groupName) {
656 int pos = groupName.indexOf(CharPool.POUND);
657
658 String storageTypeString = groupName.substring(0, pos);
659
660 return StorageType.valueOf(storageTypeString);
661 }
662
663 protected void initMemoryClusteredJobs() throws Exception {
664 List<SchedulerResponse> schedulerResponses =
665 (List<SchedulerResponse>)callMaster(
666 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
667
668 for (SchedulerResponse schedulerResponse : schedulerResponses) {
669 Trigger oldTrigger = schedulerResponse.getTrigger();
670
671 String jobName = schedulerResponse.getJobName();
672 String groupName = SchedulerEngineUtil.namespaceGroupName(
673 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
674
675 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
676 oldTrigger.getTriggerType(), jobName, groupName,
677 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
678 oldTrigger.getTriggerContent());
679
680 schedulerResponse.setTrigger(newTrigger);
681
682 TriggerState triggerState = SchedulerEngineUtil.getJobState(
683 schedulerResponse);
684
685 Message message = schedulerResponse.getMessage();
686
687 message.remove(JOB_STATE);
688
689 _memoryClusteredJobs.put(
690 getFullName(jobName, groupName),
691 new ObjectValuePair<SchedulerResponse, TriggerState>(
692 schedulerResponse, triggerState));
693 }
694 }
695
696 protected boolean isMemorySchedulerClusterLockOwner(Lock lock)
697 throws Exception {
698
699 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
700
701 if (master == _master) {
702 return master;
703 }
704
705 if (!_master) {
706 _master = master;
707
708 return _master;
709 }
710
711 _localClusterNodeAddress = getSerializedString(
712 ClusterExecutorUtil.getLocalClusterNodeAddress());
713
714 for (ObjectValuePair<SchedulerResponse, TriggerState>
715 memoryClusteredJob : _memoryClusteredJobs.values()) {
716
717 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
718
719 _schedulerEngine.delete(
720 schedulerResponse.getJobName(),
721 schedulerResponse.getGroupName());
722 }
723
724 initMemoryClusteredJobs();
725
726 if (_log.isInfoEnabled()) {
727 _log.info("Another node is now the memory scheduler master");
728 }
729
730 _master = master;
731
732 return master;
733 }
734
735 protected boolean isMemorySchedulerSlave() throws Exception {
736 return isMemorySchedulerSlave(null);
737 }
738
739 protected boolean isMemorySchedulerSlave(String groupName)
740 throws Exception {
741
742 if (groupName != null) {
743 StorageType storageType = getStorageType(groupName);
744
745 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
746 return false;
747 }
748 }
749
750 Lock lock = lockMemorySchedulerCluster(null);
751
752 if (isMemorySchedulerClusterLockOwner(lock)) {
753 return false;
754 }
755
756 return true;
757 }
758
759 protected Lock lockMemorySchedulerCluster(String owner) throws Exception {
760 Lock lock = null;
761
762 while (true) {
763 try {
764 if (owner == null) {
765 lock = LockLocalServiceUtil.lock(
766 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
767 _localClusterNodeAddress,
768 PropsValues.
769 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
770 }
771 else {
772 lock = LockLocalServiceUtil.lock(
773 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
774 _localClusterNodeAddress,
775 PropsValues.
776 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
777 }
778
779 break;
780 }
781 catch (Exception e) {
782 if (_log.isWarnEnabled()) {
783 _log.warn(
784 "Unable to obtain memory scheduler cluster lock. " +
785 "Trying again.");
786 }
787 }
788 }
789
790 if (!lock.isNew()) {
791 return lock;
792 }
793
794 boolean forceSync = ProxyModeThreadLocal.isForceSync();
795
796 ProxyModeThreadLocal.setForceSync(true);
797
798 _writeLock.lock();
799
800 try {
801 for (ObjectValuePair<SchedulerResponse, TriggerState>
802 memoryClusteredJob : _memoryClusteredJobs.values()) {
803
804 SchedulerResponse schedulerResponse =
805 memoryClusteredJob.getKey();
806
807 _schedulerEngine.schedule(
808 schedulerResponse.getTrigger(),
809 schedulerResponse.getDescription(),
810 schedulerResponse.getDestinationName(),
811 schedulerResponse.getMessage());
812
813 TriggerState triggerState = memoryClusteredJob.getValue();
814
815 if (triggerState.equals(TriggerState.PAUSED)) {
816 _schedulerEngine.pause(
817 schedulerResponse.getJobName(),
818 schedulerResponse.getGroupName());
819 }
820 }
821 }
822 finally {
823 ProxyModeThreadLocal.setForceSync(forceSync);
824
825 _writeLock.unlock();
826 }
827
828 return lock;
829 }
830
831 protected void removeMemoryClusteredJobs(String groupName) {
832 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
833 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
834
835 Iterator
836 <Map.Entry<String,
837 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
838 memoryClusteredJobs.iterator();
839
840 while (itr.hasNext()) {
841 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
842 entry = itr.next();
843
844 ObjectValuePair<SchedulerResponse, TriggerState>
845 memoryClusteredJob = entry.getValue();
846
847 SchedulerResponse schedulerResponse =
848 memoryClusteredJob.getKey();
849
850 if (groupName.equals(schedulerResponse.getGroupName())) {
851 itr.remove();
852 }
853 }
854 }
855
856 protected void skipClusterInvoking(String groupName)
857 throws SchedulerException {
858
859 StorageType storageType = getStorageType(groupName);
860
861 if (storageType.equals(StorageType.PERSISTED)) {
862 SchedulerException schedulerException = new SchedulerException();
863
864 schedulerException.setSwallowable(true);
865
866 throw schedulerException;
867 }
868 }
869
870 protected void updateMemoryClusteredJob(
871 String jobName, String groupName, TriggerState triggerState) {
872
873 ObjectValuePair<SchedulerResponse, TriggerState>
874 memoryClusteredJob = _memoryClusteredJobs.get(
875 getFullName(jobName, groupName));
876
877 if (memoryClusteredJob != null) {
878 memoryClusteredJob.setValue(triggerState);
879 }
880 }
881
882 protected void updateMemoryClusteredJobs(
883 String groupName, TriggerState triggerState) {
884
885 for (ObjectValuePair<SchedulerResponse, TriggerState>
886 memoryClusteredJob : _memoryClusteredJobs.values()) {
887
888 SchedulerResponse schedulerResponse =
889 memoryClusteredJob.getKey();
890
891 if (groupName.equals(schedulerResponse.getGroupName())) {
892 memoryClusteredJob.setValue(triggerState);
893 }
894 }
895 }
896
897 @BeanReference(
898 name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
899 protected SchedulerEngine schedulerEngine;
900
901 private static final String _LOCK_CLASS_NAME =
902 SchedulerEngine.class.getName();
903
904 private static Log _log = LogFactoryUtil.getLog(
905 ClusterSchedulerEngine.class);
906
907 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
908 SchedulerEngine.class.getName(), "getScheduledJob", String.class,
909 String.class);
910 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
911 SchedulerEngine.class.getName(), "getScheduledJobs");
912 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
913 SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
914 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
915 SchedulerEngineUtil.class.getName(), "getScheduledJobs",
916 StorageType.class);
917
918 private String _beanIdentifier;
919 private ClusterEventListener _clusterEventListener;
920 private volatile String _localClusterNodeAddress;
921 private volatile boolean _master;
922 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
923 _memoryClusteredJobs = new ConcurrentHashMap
924 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
925 private java.util.concurrent.locks.Lock _readLock;
926 private SchedulerEngine _schedulerEngine;
927 private java.util.concurrent.locks.Lock _writeLock;
928
929 private class MemorySchedulerClusterEventListener
930 implements ClusterEventListener {
931
932 public void processClusterEvent(ClusterEvent clusterEvent) {
933 ClusterEventType clusterEventType =
934 clusterEvent.getClusterEventType();
935
936 if (!clusterEventType.equals(ClusterEventType.DEPART)) {
937 return;
938 }
939
940 try {
941 updateMemorySchedulerClusterMaster();
942 }
943 catch (Exception e) {
944 _log.error("Unable to update memory scheduler cluster lock", e);
945 }
946 }
947
948 }
949
950 }