001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.IdentifiableBean;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
021 import com.liferay.portal.kernel.cluster.ClusterEvent;
022 import com.liferay.portal.kernel.cluster.ClusterEventListener;
023 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.Clusterable;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.messaging.Message;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037 import com.liferay.portal.kernel.scheduler.SchedulerException;
038 import com.liferay.portal.kernel.scheduler.StorageType;
039 import com.liferay.portal.kernel.scheduler.Trigger;
040 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041 import com.liferay.portal.kernel.scheduler.TriggerState;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
044 import com.liferay.portal.kernel.util.Base64;
045 import com.liferay.portal.kernel.util.CharPool;
046 import com.liferay.portal.kernel.util.MethodHandler;
047 import com.liferay.portal.kernel.util.MethodKey;
048 import com.liferay.portal.kernel.util.ObjectValuePair;
049 import com.liferay.portal.kernel.util.StringPool;
050 import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
051 import com.liferay.portal.model.Lock;
052 import com.liferay.portal.service.LockLocalServiceUtil;
053 import com.liferay.portal.util.PropsValues;
054
055 import java.io.ObjectInputStream;
056 import java.io.ObjectOutputStream;
057
058 import java.util.Iterator;
059 import java.util.List;
060 import java.util.Map;
061 import java.util.Set;
062 import java.util.concurrent.ConcurrentHashMap;
063 import java.util.concurrent.TimeUnit;
064 import java.util.concurrent.TimeoutException;
065 import java.util.concurrent.locks.ReadWriteLock;
066 import java.util.concurrent.locks.ReentrantReadWriteLock;
067
068
071 public class ClusterSchedulerEngine
072 implements IdentifiableBean, SchedulerEngine,
073 SchedulerEngineClusterManager {
074
075 public static SchedulerEngine createClusterSchedulerEngine(
076 SchedulerEngine schedulerEngine) {
077
078 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
079 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
080 }
081
082 return schedulerEngine;
083 }
084
085 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
086 _schedulerEngine = schedulerEngine;
087 }
088
089 @Clusterable
090 public void delete(String groupName) throws SchedulerException {
091 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
092
093 _readLock.lock();
094
095 try {
096 if (memoryClusteredSlaveJob) {
097 removeMemoryClusteredJobs(groupName);
098 }
099 else {
100 _schedulerEngine.delete(groupName);
101 }
102 }
103 finally {
104 _readLock.unlock();
105 }
106
107 skipClusterInvoking(groupName);
108 }
109
110 @Clusterable
111 public void delete(String jobName, String groupName)
112 throws SchedulerException {
113
114 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
115
116 _readLock.lock();
117
118 try {
119 if (memoryClusteredSlaveJob) {
120 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
121 }
122 else {
123 _schedulerEngine.delete(jobName, groupName);
124 }
125 }
126 finally {
127 _readLock.unlock();
128 }
129
130 skipClusterInvoking(groupName);
131 }
132
133 public String getBeanIdentifier() {
134 return _beanIdentifier;
135 }
136
137 public SchedulerResponse getScheduledJob(String jobName, String groupName)
138 throws SchedulerException {
139
140 StorageType storageType = getStorageType(groupName);
141
142 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
143 String masterAddressString = getMasterAddressString(false);
144
145 if (!_localClusterNodeAddress.equals(masterAddressString)) {
146 return (SchedulerResponse)callMaster(
147 masterAddressString, _getScheduledJobMethodKey, jobName,
148 groupName);
149 }
150 }
151
152 _readLock.lock();
153
154 try {
155 return _schedulerEngine.getScheduledJob(jobName, groupName);
156 }
157 finally {
158 _readLock.unlock();
159 }
160 }
161
162 public List<SchedulerResponse> getScheduledJobs()
163 throws SchedulerException {
164
165 String masterAddressString = getMasterAddressString(false);
166
167 if (!_localClusterNodeAddress.equals(masterAddressString)) {
168 return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
169 }
170
171 _readLock.lock();
172
173 try {
174 return _schedulerEngine.getScheduledJobs();
175 }
176 finally {
177 _readLock.unlock();
178 }
179 }
180
181 public List<SchedulerResponse> getScheduledJobs(String groupName)
182 throws SchedulerException {
183
184 StorageType storageType = getStorageType(groupName);
185
186 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
187 String masterAddressString = getMasterAddressString(false);
188
189 if (!_localClusterNodeAddress.equals(masterAddressString)) {
190 return callMaster(
191 masterAddressString, _getScheduledJobsMethodKey2,
192 groupName);
193 }
194 }
195
196 _readLock.lock();
197
198 try {
199 return _schedulerEngine.getScheduledJobs(groupName);
200 }
201 finally {
202 _readLock.unlock();
203 }
204 }
205
206 public void initialize() throws SchedulerException {
207 try {
208 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
209
210 _readLock = readWriteLock.readLock();
211 _writeLock = readWriteLock.writeLock();
212
213 _localClusterNodeAddress = getSerializedString(
214 ClusterExecutorUtil.getLocalClusterNodeAddress());
215
216 _clusterEventListener = new MemorySchedulerClusterEventListener();
217
218 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
219
220 String masterAddressString = getMasterAddressString(false);
221
222 if (!_localClusterNodeAddress.equals(masterAddressString)) {
223 List<SchedulerResponse> schedulerResponses = callMaster(
224 masterAddressString, _getScheduledJobsMethodKey3,
225 StorageType.MEMORY_CLUSTERED);
226
227 initMemoryClusteredJobs(schedulerResponses);
228 }
229 }
230 catch (Exception e) {
231 throw new SchedulerException("Unable to initialize scheduler", e);
232 }
233 }
234
235 @Clusterable
236 public void pause(String groupName) throws SchedulerException {
237 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
238
239 _readLock.lock();
240
241 try {
242 if (memoryClusteredSlaveJob) {
243 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
244
245 return;
246 }
247
248 _schedulerEngine.pause(groupName);
249 }
250 finally {
251 _readLock.unlock();
252 }
253
254 skipClusterInvoking(groupName);
255 }
256
257 @Clusterable
258 public void pause(String jobName, String groupName)
259 throws SchedulerException {
260
261 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
262
263 _readLock.lock();
264
265 try {
266 if (memoryClusteredSlaveJob) {
267 updateMemoryClusteredJob(
268 jobName, groupName, TriggerState.PAUSED);
269
270 return;
271 }
272
273 _schedulerEngine.pause(jobName, groupName);
274 }
275 finally {
276 _readLock.unlock();
277 }
278
279 skipClusterInvoking(groupName);
280 }
281
282 @Clusterable
283 public void resume(String groupName) throws SchedulerException {
284 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
285
286 _readLock.lock();
287
288 try {
289 if (memoryClusteredSlaveJob) {
290 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
291
292 return;
293 }
294
295 _schedulerEngine.resume(groupName);
296 }
297 finally {
298 _readLock.unlock();
299 }
300
301 skipClusterInvoking(groupName);
302 }
303
304 @Clusterable
305 public void resume(String jobName, String groupName)
306 throws SchedulerException {
307
308 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
309
310 _readLock.lock();
311
312 try {
313 if (memoryClusteredSlaveJob) {
314 updateMemoryClusteredJob(
315 jobName, groupName, TriggerState.NORMAL);
316
317 return;
318 }
319
320 _schedulerEngine.resume(jobName, groupName);
321 }
322 finally {
323 _readLock.unlock();
324 }
325
326 skipClusterInvoking(groupName);
327 }
328
329 @Clusterable
330 public void schedule(
331 Trigger trigger, String description, String destinationName,
332 Message message)
333 throws SchedulerException {
334
335 String groupName = trigger.getGroupName();
336 String jobName = trigger.getJobName();
337
338 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
339
340 _readLock.lock();
341
342 try {
343 if (memoryClusteredSlaveJob) {
344 SchedulerResponse schedulerResponse = new SchedulerResponse();
345
346 schedulerResponse.setDescription(description);
347 schedulerResponse.setDestinationName(destinationName);
348 schedulerResponse.setGroupName(groupName);
349 schedulerResponse.setJobName(jobName);
350 schedulerResponse.setMessage(message);
351 schedulerResponse.setTrigger(trigger);
352
353 _memoryClusteredJobs.put(
354 getFullName(jobName, groupName),
355 new ObjectValuePair<SchedulerResponse, TriggerState>(
356 schedulerResponse, TriggerState.NORMAL));
357
358 return;
359 }
360
361 _schedulerEngine.schedule(
362 trigger, description, destinationName, message);
363 }
364 finally {
365 _readLock.unlock();
366 }
367
368 skipClusterInvoking(groupName);
369 }
370
371 public void setBeanIdentifier(String beanIdentifier) {
372 _beanIdentifier = beanIdentifier;
373 }
374
375 public void shutdown() throws SchedulerException {
376 try {
377 ClusterExecutorUtil.removeClusterEventListener(
378 _clusterEventListener);
379
380 LockLocalServiceUtil.unlock(
381 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
382 PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
383 }
384 catch (Exception e) {
385 throw new SchedulerException("Unable to shutdown scheduler", e);
386 }
387
388 _schedulerEngine.shutdown();
389 }
390
391 public void start() throws SchedulerException {
392 _schedulerEngine.start();
393 }
394
395 @Clusterable
396 public void suppressError(String jobName, String groupName)
397 throws SchedulerException {
398
399 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
400
401 if (memoryClusteredSlaveJob) {
402 return;
403 }
404
405 _readLock.lock();
406
407 try {
408 _schedulerEngine.suppressError(jobName, groupName);
409 }
410 finally {
411 _readLock.unlock();
412 }
413
414 skipClusterInvoking(groupName);
415 }
416
417 @Clusterable
418 public void unschedule(String groupName) throws SchedulerException {
419 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
420
421 _readLock.lock();
422
423 try {
424 if (memoryClusteredSlaveJob) {
425 removeMemoryClusteredJobs(groupName);
426 }
427 else {
428 _schedulerEngine.unschedule(groupName);
429 }
430 }
431 finally {
432 _readLock.unlock();
433 }
434
435 skipClusterInvoking(groupName);
436 }
437
438 @Clusterable
439 public void unschedule(String jobName, String groupName)
440 throws SchedulerException {
441
442 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
443
444 _readLock.lock();
445
446 try {
447 if (memoryClusteredSlaveJob) {
448 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
449 }
450 else {
451 _schedulerEngine.unschedule(jobName, groupName);
452 }
453 }
454 finally {
455 _readLock.unlock();
456 }
457
458 skipClusterInvoking(groupName);
459 }
460
461 @Clusterable
462 public void update(Trigger trigger) throws SchedulerException {
463 String jobName = trigger.getJobName();
464 String groupName = trigger.getGroupName();
465
466 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
467
468 _readLock.lock();
469
470 try {
471 if (memoryClusteredSlaveJob) {
472 for (ObjectValuePair<SchedulerResponse, TriggerState>
473 memoryClusteredJob : _memoryClusteredJobs.values()) {
474
475 SchedulerResponse schedulerResponse =
476 memoryClusteredJob.getKey();
477
478 if (jobName.equals(schedulerResponse.getJobName()) &&
479 groupName.equals(schedulerResponse.getGroupName())) {
480
481 schedulerResponse.setTrigger(trigger);
482
483 return;
484 }
485 }
486
487 throw new SchedulerException(
488 "Unable to update trigger for memory clustered job");
489 }
490
491 _schedulerEngine.update(trigger);
492 }
493 finally {
494 _readLock.unlock();
495 }
496
497 skipClusterInvoking(groupName);
498 }
499
500 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
501 getMasterAddressString(false);
502
503 return null;
504 }
505
506 protected <T> T callMaster(
507 String masterAddressString, MethodKey methodKey,
508 Object... arguments)
509 throws SchedulerException {
510
511 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
512
513 Address address = (Address)getDeserializedObject(masterAddressString);
514
515 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
516 methodHandler, address);
517
518 clusterRequest.setBeanIdentifier(_beanIdentifier);
519
520 try {
521 FutureClusterResponses futureClusterResponses =
522 ClusterExecutorUtil.execute(clusterRequest);
523
524 ClusterNodeResponses clusterNodeResponses =
525 futureClusterResponses.get(20, TimeUnit.SECONDS);
526
527 ClusterNodeResponse clusterNodeResponse =
528 clusterNodeResponses.getClusterResponse(address);
529
530 return (T)clusterNodeResponse.getResult();
531 }
532 catch (Exception e) {
533 throw new SchedulerException(
534 "Unable to load scheduled jobs from cluster node " +
535 address.getDescription(),
536 e);
537 }
538 }
539
540 protected Object getDeserializedObject(String string)
541 throws SchedulerException {
542
543 byte[] bytes = Base64.decode(string);
544
545 UnsyncByteArrayInputStream byteArrayInputStream =
546 new UnsyncByteArrayInputStream(bytes);
547
548 ObjectInputStream objectInputStream = null;
549
550 try {
551 objectInputStream = new ObjectInputStream(byteArrayInputStream);
552
553 Object object = objectInputStream.readObject();
554
555 return object;
556 }
557
558 catch(Exception e) {
559 throw new SchedulerException(
560 "Unable to deserialize object from " + string, e);
561 }
562 finally {
563 try {
564 objectInputStream.close();
565 }
566
567 catch(Exception e) {
568 }
569 }
570 }
571
572 protected String getFullName(String jobName, String groupName) {
573 return groupName.concat(StringPool.PERIOD).concat(jobName);
574 }
575
576 protected String getMasterAddressString(boolean asynchronous)
577 throws SchedulerException {
578
579 String owner = null;
580
581 Lock lock = null;
582
583 while (true) {
584 try {
585 if (owner == null) {
586 lock = LockLocalServiceUtil.lock(
587 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
588 _localClusterNodeAddress,
589 PropsValues.
590 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
591 }
592 else {
593 lock = LockLocalServiceUtil.lock(
594 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
595 _localClusterNodeAddress,
596 PropsValues.
597 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
598 }
599
600 Address address = (Address)getDeserializedObject(
601 lock.getOwner());
602
603 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
604 break;
605 }
606 else {
607 owner = lock.getOwner();
608 }
609 }
610 catch (Exception e) {
611 if (_log.isWarnEnabled()) {
612 _log.warn(
613 "Unable to obtain memory scheduler cluster lock. " +
614 "Trying again.");
615 }
616 }
617 }
618
619 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
620
621 if (master == _master) {
622 return lock.getOwner();
623 }
624
625 if (master) {
626 slaveToMaster();
627 }
628 else {
629 masterToSlave(lock.getOwner(), asynchronous);
630 }
631
632 return lock.getOwner();
633 }
634
635 protected String getSerializedString(Object object) throws Exception {
636 UnsyncByteArrayOutputStream byteArrayOutputStream =
637 new UnsyncByteArrayOutputStream();
638
639 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
640 byteArrayOutputStream);
641
642 objectOutputStream.writeObject(object);
643 objectOutputStream.close();
644
645 byte[] bytes = byteArrayOutputStream.toByteArray();
646
647 return Base64.encode(bytes);
648 }
649
650 protected StorageType getStorageType(String groupName) {
651 int pos = groupName.indexOf(CharPool.POUND);
652
653 String storageTypeString = groupName.substring(0, pos);
654
655 return StorageType.valueOf(storageTypeString);
656 }
657
658 protected void initMemoryClusteredJobs(
659 List<SchedulerResponse> schedulerResponses)
660 throws Exception {
661
662 for (SchedulerResponse schedulerResponse : schedulerResponses) {
663 Trigger oldTrigger = schedulerResponse.getTrigger();
664
665 String jobName = schedulerResponse.getJobName();
666 String groupName = SchedulerEngineUtil.namespaceGroupName(
667 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
668
669 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
670 oldTrigger.getTriggerType(), jobName, groupName,
671 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
672 oldTrigger.getTriggerContent());
673
674 schedulerResponse.setTrigger(newTrigger);
675
676 TriggerState triggerState = SchedulerEngineUtil.getJobState(
677 schedulerResponse);
678
679 Message message = schedulerResponse.getMessage();
680
681 message.remove(JOB_STATE);
682
683 _memoryClusteredJobs.put(
684 getFullName(jobName, groupName),
685 new ObjectValuePair<SchedulerResponse, TriggerState>(
686 schedulerResponse, triggerState));
687 }
688 }
689
690 protected boolean isMemoryClusteredSlaveJob(String groupName)
691 throws SchedulerException {
692
693 StorageType storageType = getStorageType(groupName);
694
695 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
696 return false;
697 }
698
699 String masterAddressString = getMasterAddressString(false);
700
701 if (_localClusterNodeAddress.equals(masterAddressString)) {
702 return false;
703 }
704
705 return true;
706 }
707
708 protected void masterToSlave(
709 String masterAddressString, boolean asynchronous)
710 throws SchedulerException {
711
712 if (asynchronous) {
713 MethodHandler methodHandler = new MethodHandler(
714 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
715
716 Address address = (Address)getDeserializedObject(
717 masterAddressString);
718
719 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
720 methodHandler, address);
721
722 clusterRequest.setBeanIdentifier(_beanIdentifier);
723
724 try {
725 ClusterExecutorUtil.execute(
726 clusterRequest,
727 new MemorySchedulerClusterResponseCallback(address), 20,
728 TimeUnit.SECONDS);
729
730 return;
731 }
732 catch (Exception e) {
733 throw new SchedulerException(
734 "Unable to load scheduled jobs from cluster node " +
735 address.getDescription(),
736 e);
737 }
738 }
739
740 List<SchedulerResponse> schedulerResponses = callMaster(
741 masterAddressString, _getScheduledJobsMethodKey3,
742 StorageType.MEMORY_CLUSTERED);
743
744 _doMasterToSlave(schedulerResponses);
745 }
746
747 protected void removeMemoryClusteredJobs(String groupName) {
748 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
749 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
750
751 Iterator
752 <Map.Entry<String,
753 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
754 memoryClusteredJobs.iterator();
755
756 while (itr.hasNext()) {
757 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
758 entry = itr.next();
759
760 ObjectValuePair<SchedulerResponse, TriggerState>
761 memoryClusteredJob = entry.getValue();
762
763 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
764
765 if (groupName.equals(schedulerResponse.getGroupName())) {
766 itr.remove();
767 }
768 }
769 }
770
771 protected void skipClusterInvoking(String groupName)
772 throws SchedulerException {
773
774 StorageType storageType = getStorageType(groupName);
775
776 if (storageType.equals(StorageType.PERSISTED) ||
777 PluginContextLifecycleThreadLocal.isDestroying()) {
778
779 SchedulerException schedulerException = new SchedulerException();
780
781 schedulerException.setSwallowable(true);
782
783 throw schedulerException;
784 }
785 }
786
787 protected void slaveToMaster() throws SchedulerException {
788 boolean forceSync = ProxyModeThreadLocal.isForceSync();
789
790 ProxyModeThreadLocal.setForceSync(true);
791
792 _writeLock.lock();
793
794 try {
795 for (ObjectValuePair<SchedulerResponse, TriggerState>
796 memoryClusteredJob : _memoryClusteredJobs.values()) {
797
798 SchedulerResponse schedulerResponse =
799 memoryClusteredJob.getKey();
800
801 _schedulerEngine.schedule(
802 schedulerResponse.getTrigger(),
803 schedulerResponse.getDescription(),
804 schedulerResponse.getDestinationName(),
805 schedulerResponse.getMessage());
806
807 TriggerState triggerState = memoryClusteredJob.getValue();
808
809 if (triggerState.equals(TriggerState.PAUSED)) {
810 _schedulerEngine.pause(
811 schedulerResponse.getJobName(),
812 schedulerResponse.getGroupName());
813 }
814 }
815
816 _memoryClusteredJobs.clear();
817 }
818 finally {
819 ProxyModeThreadLocal.setForceSync(forceSync);
820
821 _master = true;
822
823 _writeLock.unlock();
824 }
825 }
826
827 protected void updateMemoryClusteredJob(
828 String jobName, String groupName, TriggerState triggerState) {
829
830 ObjectValuePair<SchedulerResponse, TriggerState>
831 memoryClusteredJob = _memoryClusteredJobs.get(
832 getFullName(jobName, groupName));
833
834 if (memoryClusteredJob != null) {
835 memoryClusteredJob.setValue(triggerState);
836 }
837 }
838
839 protected void updateMemoryClusteredJobs(
840 String groupName, TriggerState triggerState) {
841
842 for (ObjectValuePair<SchedulerResponse, TriggerState>
843 memoryClusteredJob : _memoryClusteredJobs.values()) {
844
845 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
846
847 if (groupName.equals(schedulerResponse.getGroupName())) {
848 memoryClusteredJob.setValue(triggerState);
849 }
850 }
851 }
852
853 @BeanReference(
854 name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
855 protected SchedulerEngine schedulerEngine;
856
857 private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
858 throws SchedulerException {
859
860 _writeLock.lock();
861
862 try {
863 for (SchedulerResponse schedulerResponse :
864 _schedulerEngine.getScheduledJobs()) {
865
866 if (StorageType.MEMORY_CLUSTERED ==
867 schedulerResponse.getStorageType()) {
868
869 _schedulerEngine.delete(
870 schedulerResponse.getJobName(),
871 schedulerResponse.getGroupName());
872 }
873 }
874
875 initMemoryClusteredJobs(schedulerResponses);
876
877 if (_log.isInfoEnabled()) {
878 _log.info("Switched current node from master to slave");
879 }
880 }
881 catch (Exception e) {
882 throw new SchedulerException(e);
883 }
884 finally {
885 _master = false;
886
887 _writeLock.unlock();
888 }
889 }
890
891 private static final String _LOCK_CLASS_NAME =
892 SchedulerEngine.class.getName();
893
894 private static Log _log = LogFactoryUtil.getLog(
895 ClusterSchedulerEngine.class);
896
897 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
898 SchedulerEngine.class.getName(), "getScheduledJob", String.class,
899 String.class);
900 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
901 SchedulerEngine.class.getName(), "getScheduledJobs");
902 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
903 SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
904 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
905 SchedulerEngineUtil.class.getName(), "getScheduledJobs",
906 StorageType.class);
907
908 private String _beanIdentifier;
909 private ClusterEventListener _clusterEventListener;
910 private volatile String _localClusterNodeAddress;
911 private volatile boolean _master;
912 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
913 _memoryClusteredJobs = new ConcurrentHashMap
914 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
915 private java.util.concurrent.locks.Lock _readLock;
916 private SchedulerEngine _schedulerEngine;
917 private java.util.concurrent.locks.Lock _writeLock;
918
919 private class MemorySchedulerClusterEventListener
920 implements ClusterEventListener {
921
922 public void processClusterEvent(ClusterEvent clusterEvent) {
923 try {
924 getMasterAddressString(true);
925 }
926 catch (Exception e) {
927 _log.error("Unable to update memory scheduler cluster lock", e);
928 }
929 }
930
931 }
932
933 private class MemorySchedulerClusterResponseCallback
934 extends BaseClusterResponseCallback {
935
936 public MemorySchedulerClusterResponseCallback(Address address) {
937 _address = address;
938 }
939
940 @Override
941 public void callback(ClusterNodeResponses clusterNodeResponses) {
942 try {
943 ClusterNodeResponse clusterNodeResponse =
944 clusterNodeResponses.getClusterResponse(_address);
945
946 List<SchedulerResponse> schedulerResponses =
947 (List<SchedulerResponse>)clusterNodeResponse.getResult();
948
949 _doMasterToSlave(schedulerResponses);
950 }
951 catch (Exception e) {
952 _log.error(
953 "Unable to load memory clustered jobs from cluster node " +
954 _address.getDescription(),
955 e);
956 }
957 }
958
959 @Override
960 public void processTimeoutException(TimeoutException timeoutException) {
961 _log.error(
962 "Unable to load memory clustered jobs from cluster node " +
963 _address.getDescription(),
964 timeoutException);
965 }
966
967 private Address _address;
968
969 }
970
971 }