001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.IdentifiableBean;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
021 import com.liferay.portal.kernel.cluster.ClusterEvent;
022 import com.liferay.portal.kernel.cluster.ClusterEventListener;
023 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.Clusterable;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.messaging.Message;
034 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
036 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
037 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
038 import com.liferay.portal.kernel.scheduler.SchedulerException;
039 import com.liferay.portal.kernel.scheduler.StorageType;
040 import com.liferay.portal.kernel.scheduler.Trigger;
041 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
042 import com.liferay.portal.kernel.scheduler.TriggerState;
043 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
044 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
045 import com.liferay.portal.kernel.util.Base64;
046 import com.liferay.portal.kernel.util.CharPool;
047 import com.liferay.portal.kernel.util.MethodHandler;
048 import com.liferay.portal.kernel.util.MethodKey;
049 import com.liferay.portal.kernel.util.ObjectValuePair;
050 import com.liferay.portal.kernel.util.StringPool;
051 import com.liferay.portal.model.Lock;
052 import com.liferay.portal.service.LockLocalServiceUtil;
053 import com.liferay.portal.util.PropsValues;
054
055 import java.io.ObjectInputStream;
056 import java.io.ObjectOutputStream;
057
058 import java.util.Iterator;
059 import java.util.List;
060 import java.util.Map;
061 import java.util.Set;
062 import java.util.concurrent.ConcurrentHashMap;
063 import java.util.concurrent.TimeUnit;
064 import java.util.concurrent.TimeoutException;
065 import java.util.concurrent.locks.ReadWriteLock;
066 import java.util.concurrent.locks.ReentrantReadWriteLock;
067
068
071 public class ClusterSchedulerEngine
072 implements IdentifiableBean, SchedulerEngine,
073 SchedulerEngineClusterManager {
074
075 public static SchedulerEngine createClusterSchedulerEngine(
076 SchedulerEngine schedulerEngine) {
077
078 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
079 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
080 }
081
082 return schedulerEngine;
083 }
084
085 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
086 _schedulerEngine = schedulerEngine;
087 }
088
089 @Clusterable
090 public void delete(String groupName) throws SchedulerException {
091 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
092
093 _readLock.lock();
094
095 try {
096 if (memoryClusteredSlaveJob) {
097 removeMemoryClusteredJobs(groupName);
098 }
099 else {
100 _schedulerEngine.delete(groupName);
101 }
102 }
103 finally {
104 _readLock.unlock();
105 }
106
107 skipClusterInvoking(groupName);
108 }
109
110 @Clusterable
111 public void delete(String jobName, String groupName)
112 throws SchedulerException {
113
114 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
115
116 _readLock.lock();
117
118 try {
119 if (memoryClusteredSlaveJob) {
120 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
121 }
122 else {
123 _schedulerEngine.delete(jobName, groupName);
124 }
125 }
126 finally {
127 _readLock.unlock();
128 }
129
130 skipClusterInvoking(groupName);
131 }
132
133 public String getBeanIdentifier() {
134 return _beanIdentifier;
135 }
136
137 public SchedulerResponse getScheduledJob(String jobName, String groupName)
138 throws SchedulerException {
139
140 StorageType storageType = getStorageType(groupName);
141
142 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
143 String masterAddressString = getMasterAddressString(false);
144
145 if (!_localClusterNodeAddress.equals(masterAddressString)) {
146 return (SchedulerResponse)callMaster(
147 masterAddressString, _getScheduledJobMethodKey, jobName,
148 groupName);
149 }
150 }
151
152 _readLock.lock();
153
154 try {
155 return _schedulerEngine.getScheduledJob(jobName, groupName);
156 }
157 finally {
158 _readLock.unlock();
159 }
160 }
161
162 public List<SchedulerResponse> getScheduledJobs()
163 throws SchedulerException {
164
165 String masterAddressString = getMasterAddressString(false);
166
167 if (!_localClusterNodeAddress.equals(masterAddressString)) {
168 return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
169 }
170
171 _readLock.lock();
172
173 try {
174 return _schedulerEngine.getScheduledJobs();
175 }
176 finally {
177 _readLock.unlock();
178 }
179 }
180
181 public List<SchedulerResponse> getScheduledJobs(String groupName)
182 throws SchedulerException {
183
184 StorageType storageType = getStorageType(groupName);
185
186 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
187 String masterAddressString = getMasterAddressString(false);
188
189 if (!_localClusterNodeAddress.equals(masterAddressString)) {
190 return callMaster(
191 masterAddressString, _getScheduledJobsMethodKey2,
192 groupName);
193 }
194 }
195
196 _readLock.lock();
197
198 try {
199 return _schedulerEngine.getScheduledJobs(groupName);
200 }
201 finally {
202 _readLock.unlock();
203 }
204 }
205
206 public void initialize() throws SchedulerException {
207 try {
208 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
209
210 _readLock = readWriteLock.readLock();
211 _writeLock = readWriteLock.writeLock();
212
213 _localClusterNodeAddress = getSerializedString(
214 ClusterExecutorUtil.getLocalClusterNodeAddress());
215
216 _clusterEventListener = new MemorySchedulerClusterEventListener();
217
218 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
219
220 String masterAddressString = getMasterAddressString(false);
221
222 if (!_localClusterNodeAddress.equals(masterAddressString)) {
223 List<SchedulerResponse> schedulerResponses = callMaster(
224 masterAddressString, _getScheduledJobsMethodKey3,
225 StorageType.MEMORY_CLUSTERED);
226
227 initMemoryClusteredJobs(schedulerResponses);
228 }
229 }
230 catch (Exception e) {
231 throw new SchedulerException("Unable to initialize scheduler", e);
232 }
233 }
234
235 @Clusterable
236 public void pause(String groupName) throws SchedulerException {
237 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
238
239 _readLock.lock();
240
241 try {
242 if (memoryClusteredSlaveJob) {
243 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
244
245 return;
246 }
247
248 _schedulerEngine.pause(groupName);
249 }
250 finally {
251 _readLock.unlock();
252 }
253
254 skipClusterInvoking(groupName);
255 }
256
257 @Clusterable
258 public void pause(String jobName, String groupName)
259 throws SchedulerException {
260
261 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
262
263 _readLock.lock();
264
265 try {
266 if (memoryClusteredSlaveJob) {
267 updateMemoryClusteredJob(
268 jobName, groupName, TriggerState.PAUSED);
269
270 return;
271 }
272
273 _schedulerEngine.pause(jobName, groupName);
274 }
275 finally {
276 _readLock.unlock();
277 }
278
279 skipClusterInvoking(groupName);
280 }
281
282 @Clusterable
283 public void resume(String groupName) throws SchedulerException {
284 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
285
286 _readLock.lock();
287
288 try {
289 if (memoryClusteredSlaveJob) {
290 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
291
292 return;
293 }
294
295 _schedulerEngine.resume(groupName);
296 }
297 finally {
298 _readLock.unlock();
299 }
300
301 skipClusterInvoking(groupName);
302 }
303
304 @Clusterable
305 public void resume(String jobName, String groupName)
306 throws SchedulerException {
307
308 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
309
310 _readLock.lock();
311
312 try {
313 if (memoryClusteredSlaveJob) {
314 updateMemoryClusteredJob(
315 jobName, groupName, TriggerState.NORMAL);
316
317 return;
318 }
319
320 _schedulerEngine.resume(jobName, groupName);
321 }
322 finally {
323 _readLock.unlock();
324 }
325
326 skipClusterInvoking(groupName);
327 }
328
329 @Clusterable
330 public void schedule(
331 Trigger trigger, String description, String destinationName,
332 Message message)
333 throws SchedulerException {
334
335 String groupName = trigger.getGroupName();
336 String jobName = trigger.getJobName();
337
338 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
339
340 _readLock.lock();
341
342 try {
343 if (memoryClusteredSlaveJob) {
344 SchedulerResponse schedulerResponse = new SchedulerResponse();
345
346 schedulerResponse.setDescription(description);
347 schedulerResponse.setDestinationName(destinationName);
348 schedulerResponse.setGroupName(groupName);
349 schedulerResponse.setJobName(jobName);
350 schedulerResponse.setMessage(message);
351 schedulerResponse.setTrigger(trigger);
352
353 _memoryClusteredJobs.put(
354 getFullName(jobName, groupName),
355 new ObjectValuePair<SchedulerResponse, TriggerState>(
356 schedulerResponse, TriggerState.NORMAL));
357
358 return;
359 }
360
361 _schedulerEngine.schedule(
362 trigger, description, destinationName, message);
363 }
364 finally {
365 _readLock.unlock();
366 }
367
368 skipClusterInvoking(groupName);
369 }
370
371 public void setBeanIdentifier(String beanIdentifier) {
372 _beanIdentifier = beanIdentifier;
373 }
374
375 public void shutdown() throws SchedulerException {
376 try {
377 ClusterExecutorUtil.removeClusterEventListener(
378 _clusterEventListener);
379
380 LockLocalServiceUtil.unlock(
381 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
382 PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
383 }
384 catch (Exception e) {
385 throw new SchedulerException("Unable to shutdown scheduler", e);
386 }
387
388 _schedulerEngine.shutdown();
389 }
390
391 public void start() throws SchedulerException {
392 _schedulerEngine.start();
393 }
394
395 @Clusterable
396 public void suppressError(String jobName, String groupName)
397 throws SchedulerException {
398
399 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
400
401 if (memoryClusteredSlaveJob) {
402 return;
403 }
404
405 _readLock.lock();
406
407 try {
408 _schedulerEngine.suppressError(jobName, groupName);
409 }
410 finally {
411 _readLock.unlock();
412 }
413
414 skipClusterInvoking(groupName);
415 }
416
417 @Clusterable
418 public void unschedule(String groupName) throws SchedulerException {
419 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
420
421 _readLock.lock();
422
423 try {
424 if (memoryClusteredSlaveJob) {
425 removeMemoryClusteredJobs(groupName);
426 }
427 else {
428 _schedulerEngine.unschedule(groupName);
429 }
430 }
431 finally {
432 _readLock.unlock();
433 }
434
435 skipClusterInvoking(groupName);
436 }
437
438 @Clusterable
439 public void unschedule(String jobName, String groupName)
440 throws SchedulerException {
441
442 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
443
444 _readLock.lock();
445
446 try {
447 if (memoryClusteredSlaveJob) {
448 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
449 }
450 else {
451 _schedulerEngine.unschedule(jobName, groupName);
452 }
453 }
454 finally {
455 _readLock.unlock();
456 }
457
458 skipClusterInvoking(groupName);
459 }
460
461 @Clusterable
462 public void update(Trigger trigger) throws SchedulerException {
463 String jobName = trigger.getJobName();
464 String groupName = trigger.getGroupName();
465
466 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
467
468 _readLock.lock();
469
470 try {
471 if (memoryClusteredSlaveJob) {
472 for (ObjectValuePair<SchedulerResponse, TriggerState>
473 memoryClusteredJob : _memoryClusteredJobs.values()) {
474
475 SchedulerResponse schedulerResponse =
476 memoryClusteredJob.getKey();
477
478 if (jobName.equals(schedulerResponse.getJobName()) &&
479 groupName.equals(schedulerResponse.getGroupName())) {
480
481 schedulerResponse.setTrigger(trigger);
482
483 return;
484 }
485 }
486
487 throw new SchedulerException(
488 "Unable to update trigger for memory clustered job");
489 }
490
491 _schedulerEngine.update(trigger);
492 }
493 finally {
494 _readLock.unlock();
495 }
496
497 skipClusterInvoking(groupName);
498 }
499
500 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
501 getMasterAddressString(false);
502
503 return null;
504 }
505
506 protected <T> T callMaster(
507 String masterAddressString, MethodKey methodKey,
508 Object... arguments)
509 throws SchedulerException {
510
511 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
512
513 Address address = (Address)getDeserializedObject(masterAddressString);
514
515 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
516 methodHandler, address);
517
518 clusterRequest.setBeanIdentifier(_beanIdentifier);
519
520 try {
521 FutureClusterResponses futureClusterResponses =
522 ClusterExecutorUtil.execute(clusterRequest);
523
524 ClusterNodeResponses clusterNodeResponses =
525 futureClusterResponses.get(20, TimeUnit.SECONDS);
526
527 ClusterNodeResponse clusterNodeResponse =
528 clusterNodeResponses.getClusterResponse(address);
529
530 return (T)clusterNodeResponse.getResult();
531 }
532 catch (Exception e) {
533 throw new SchedulerException(
534 "Unable to load scheduled jobs from cluster node " +
535 address.getDescription(),
536 e);
537 }
538 }
539
540 protected Object getDeserializedObject(String string)
541 throws SchedulerException {
542
543 byte[] bytes = Base64.decode(string);
544
545 UnsyncByteArrayInputStream byteArrayInputStream =
546 new UnsyncByteArrayInputStream(bytes);
547
548 ObjectInputStream objectInputStream = null;
549
550 try {
551 objectInputStream = new ObjectInputStream(byteArrayInputStream);
552
553 Object object = objectInputStream.readObject();
554
555 return object;
556 }
557 catch (Exception e) {
558 throw new SchedulerException(
559 "Unable to deserialize object from " + string, e);
560 }
561 finally {
562 try {
563 objectInputStream.close();
564 }
565 catch (Exception e) {
566 }
567 }
568 }
569
570 protected String getFullName(String jobName, String groupName) {
571 return groupName.concat(StringPool.PERIOD).concat(jobName);
572 }
573
574 protected String getMasterAddressString(boolean asynchronous)
575 throws SchedulerException {
576
577 String owner = null;
578
579 Lock lock = null;
580
581 while (true) {
582 try {
583 if (owner == null) {
584 lock = LockLocalServiceUtil.lock(
585 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
586 _localClusterNodeAddress,
587 PropsValues.
588 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
589 }
590 else {
591 lock = LockLocalServiceUtil.lock(
592 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
593 _localClusterNodeAddress,
594 PropsValues.
595 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
596 }
597
598 Address address = (Address)getDeserializedObject(
599 lock.getOwner());
600
601 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
602 break;
603 }
604 else {
605 owner = lock.getOwner();
606 }
607 }
608 catch (Exception e) {
609 if (_log.isWarnEnabled()) {
610 _log.warn(
611 "Unable to obtain memory scheduler cluster lock. " +
612 "Trying again.");
613 }
614 }
615 }
616
617 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
618
619 if (master == _master) {
620 return lock.getOwner();
621 }
622
623 if (master) {
624 slaveToMaster();
625 }
626 else {
627 masterToSlave(lock.getOwner(), asynchronous);
628 }
629
630 return lock.getOwner();
631 }
632
633 protected String getSerializedString(Object object) throws Exception {
634 UnsyncByteArrayOutputStream byteArrayOutputStream =
635 new UnsyncByteArrayOutputStream();
636
637 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
638 byteArrayOutputStream);
639
640 objectOutputStream.writeObject(object);
641 objectOutputStream.close();
642
643 byte[] bytes = byteArrayOutputStream.toByteArray();
644
645 return Base64.encode(bytes);
646 }
647
648 protected StorageType getStorageType(String groupName) {
649 int pos = groupName.indexOf(CharPool.POUND);
650
651 String storageTypeString = groupName.substring(0, pos);
652
653 return StorageType.valueOf(storageTypeString);
654 }
655
656 protected void initMemoryClusteredJobs(
657 List<SchedulerResponse> schedulerResponses)
658 throws Exception {
659
660 for (SchedulerResponse schedulerResponse : schedulerResponses) {
661 Trigger oldTrigger = schedulerResponse.getTrigger();
662
663 String jobName = schedulerResponse.getJobName();
664 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
665 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
666
667 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
668 oldTrigger.getTriggerType(), jobName, groupName,
669 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
670 oldTrigger.getTriggerContent());
671
672 schedulerResponse.setTrigger(newTrigger);
673
674 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
675 schedulerResponse);
676
677 Message message = schedulerResponse.getMessage();
678
679 message.remove(JOB_STATE);
680
681 _memoryClusteredJobs.put(
682 getFullName(jobName, groupName),
683 new ObjectValuePair<SchedulerResponse, TriggerState>(
684 schedulerResponse, triggerState));
685 }
686 }
687
688 protected boolean isMemoryClusteredSlaveJob(String groupName)
689 throws SchedulerException {
690
691 StorageType storageType = getStorageType(groupName);
692
693 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
694 return false;
695 }
696
697 String masterAddressString = getMasterAddressString(false);
698
699 if (_localClusterNodeAddress.equals(masterAddressString)) {
700 return false;
701 }
702
703 return true;
704 }
705
706 protected void masterToSlave(
707 String masterAddressString, boolean asynchronous)
708 throws SchedulerException {
709
710 if (asynchronous) {
711 MethodHandler methodHandler = new MethodHandler(
712 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
713
714 Address address = (Address)getDeserializedObject(
715 masterAddressString);
716
717 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
718 methodHandler, address);
719
720 clusterRequest.setBeanIdentifier(_beanIdentifier);
721
722 try {
723 ClusterExecutorUtil.execute(
724 clusterRequest,
725 new MemorySchedulerClusterResponseCallback(address), 20,
726 TimeUnit.SECONDS);
727
728 return;
729 }
730 catch (Exception e) {
731 throw new SchedulerException(
732 "Unable to load scheduled jobs from cluster node " +
733 address.getDescription(),
734 e);
735 }
736 }
737
738 List<SchedulerResponse> schedulerResponses = callMaster(
739 masterAddressString, _getScheduledJobsMethodKey3,
740 StorageType.MEMORY_CLUSTERED);
741
742 _doMasterToSlave(schedulerResponses);
743 }
744
745 protected void removeMemoryClusteredJobs(String groupName) {
746 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
747 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
748
749 Iterator
750 <Map.Entry<String,
751 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
752 memoryClusteredJobs.iterator();
753
754 while (itr.hasNext()) {
755 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
756 entry = itr.next();
757
758 ObjectValuePair<SchedulerResponse, TriggerState>
759 memoryClusteredJob = entry.getValue();
760
761 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
762
763 if (groupName.equals(schedulerResponse.getGroupName())) {
764 itr.remove();
765 }
766 }
767 }
768
769 protected void skipClusterInvoking(String groupName)
770 throws SchedulerException {
771
772 StorageType storageType = getStorageType(groupName);
773
774 if (storageType.equals(StorageType.PERSISTED) ||
775 PluginContextLifecycleThreadLocal.isDestroying()) {
776
777 SchedulerException schedulerException = new SchedulerException();
778
779 schedulerException.setSwallowable(true);
780
781 throw schedulerException;
782 }
783 }
784
785 protected void slaveToMaster() throws SchedulerException {
786 boolean forceSync = ProxyModeThreadLocal.isForceSync();
787
788 ProxyModeThreadLocal.setForceSync(true);
789
790 _writeLock.lock();
791
792 try {
793 for (ObjectValuePair<SchedulerResponse, TriggerState>
794 memoryClusteredJob : _memoryClusteredJobs.values()) {
795
796 SchedulerResponse schedulerResponse =
797 memoryClusteredJob.getKey();
798
799 _schedulerEngine.schedule(
800 schedulerResponse.getTrigger(),
801 schedulerResponse.getDescription(),
802 schedulerResponse.getDestinationName(),
803 schedulerResponse.getMessage());
804
805 TriggerState triggerState = memoryClusteredJob.getValue();
806
807 if (triggerState.equals(TriggerState.PAUSED)) {
808 _schedulerEngine.pause(
809 schedulerResponse.getJobName(),
810 schedulerResponse.getGroupName());
811 }
812 }
813
814 _memoryClusteredJobs.clear();
815 }
816 finally {
817 ProxyModeThreadLocal.setForceSync(forceSync);
818
819 _master = true;
820
821 _writeLock.unlock();
822 }
823 }
824
825 protected void updateMemoryClusteredJob(
826 String jobName, String groupName, TriggerState triggerState) {
827
828 ObjectValuePair<SchedulerResponse, TriggerState>
829 memoryClusteredJob = _memoryClusteredJobs.get(
830 getFullName(jobName, groupName));
831
832 if (memoryClusteredJob != null) {
833 memoryClusteredJob.setValue(triggerState);
834 }
835 }
836
837 protected void updateMemoryClusteredJobs(
838 String groupName, TriggerState triggerState) {
839
840 for (ObjectValuePair<SchedulerResponse, TriggerState>
841 memoryClusteredJob : _memoryClusteredJobs.values()) {
842
843 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
844
845 if (groupName.equals(schedulerResponse.getGroupName())) {
846 memoryClusteredJob.setValue(triggerState);
847 }
848 }
849 }
850
851 @BeanReference(
852 name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
853 protected SchedulerEngine schedulerEngine;
854
855 private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
856 throws SchedulerException {
857
858 _writeLock.lock();
859
860 try {
861 for (SchedulerResponse schedulerResponse :
862 _schedulerEngine.getScheduledJobs()) {
863
864 if (StorageType.MEMORY_CLUSTERED ==
865 schedulerResponse.getStorageType()) {
866
867 String groupName = StorageType.MEMORY_CLUSTERED.toString();
868
869 groupName = groupName.concat(StringPool.POUND).concat(
870 schedulerResponse.getGroupName());
871
872 _schedulerEngine.delete(
873 schedulerResponse.getJobName(), groupName);
874 }
875 }
876
877 initMemoryClusteredJobs(schedulerResponses);
878
879 if (_log.isInfoEnabled()) {
880 _log.info("Switched current node from master to slave");
881 }
882 }
883 catch (Exception e) {
884 throw new SchedulerException(e);
885 }
886 finally {
887 _master = false;
888
889 _writeLock.unlock();
890 }
891 }
892
893 private static final String _LOCK_CLASS_NAME =
894 SchedulerEngine.class.getName();
895
896 private static Log _log = LogFactoryUtil.getLog(
897 ClusterSchedulerEngine.class);
898
899 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
900 SchedulerEngine.class.getName(), "getScheduledJob", String.class,
901 String.class);
902 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
903 SchedulerEngine.class.getName(), "getScheduledJobs");
904 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
905 SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
906 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
907 SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs",
908 StorageType.class);
909
910 private String _beanIdentifier;
911 private ClusterEventListener _clusterEventListener;
912 private volatile String _localClusterNodeAddress;
913 private volatile boolean _master;
914 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
915 _memoryClusteredJobs = new ConcurrentHashMap
916 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
917 private java.util.concurrent.locks.Lock _readLock;
918 private SchedulerEngine _schedulerEngine;
919 private java.util.concurrent.locks.Lock _writeLock;
920
921 private class MemorySchedulerClusterEventListener
922 implements ClusterEventListener {
923
924 public void processClusterEvent(ClusterEvent clusterEvent) {
925 try {
926 getMasterAddressString(true);
927 }
928 catch (Exception e) {
929 _log.error("Unable to update memory scheduler cluster lock", e);
930 }
931 }
932
933 }
934
935 private class MemorySchedulerClusterResponseCallback
936 extends BaseClusterResponseCallback {
937
938 public MemorySchedulerClusterResponseCallback(Address address) {
939 _address = address;
940 }
941
942 @Override
943 public void callback(ClusterNodeResponses clusterNodeResponses) {
944 try {
945 ClusterNodeResponse clusterNodeResponse =
946 clusterNodeResponses.getClusterResponse(_address);
947
948 List<SchedulerResponse> schedulerResponses =
949 (List<SchedulerResponse>)clusterNodeResponse.getResult();
950
951 _doMasterToSlave(schedulerResponses);
952 }
953 catch (Exception e) {
954 _log.error(
955 "Unable to load memory clustered jobs from cluster node " +
956 _address.getDescription(),
957 e);
958 }
959 }
960
961 @Override
962 public void processTimeoutException(TimeoutException timeoutException) {
963 _log.error(
964 "Unable to load memory clustered jobs from cluster node " +
965 _address.getDescription(),
966 timeoutException);
967 }
968
969 private Address _address;
970
971 }
972
973 }