001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018 import com.liferay.portal.kernel.bean.BeanReference;
019 import com.liferay.portal.kernel.bean.IdentifiableBean;
020 import com.liferay.portal.kernel.cluster.Address;
021 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
022 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023 import com.liferay.portal.kernel.cluster.ClusterEvent;
024 import com.liferay.portal.kernel.cluster.ClusterEventListener;
025 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
028 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
029 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
030 import com.liferay.portal.kernel.cluster.ClusterRequest;
031 import com.liferay.portal.kernel.cluster.Clusterable;
032 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.Message;
036 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
037 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
038 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
039 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
040 import com.liferay.portal.kernel.scheduler.SchedulerException;
041 import com.liferay.portal.kernel.scheduler.StorageType;
042 import com.liferay.portal.kernel.scheduler.Trigger;
043 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
044 import com.liferay.portal.kernel.scheduler.TriggerState;
045 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
046 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
047 import com.liferay.portal.kernel.util.CharPool;
048 import com.liferay.portal.kernel.util.MethodHandler;
049 import com.liferay.portal.kernel.util.MethodKey;
050 import com.liferay.portal.kernel.util.ObjectValuePair;
051 import com.liferay.portal.kernel.util.StringPool;
052 import com.liferay.portal.model.Lock;
053 import com.liferay.portal.service.LockLocalServiceUtil;
054 import com.liferay.portal.util.PropsValues;
055
056 import java.io.Serializable;
057
058 import java.util.Iterator;
059 import java.util.List;
060 import java.util.Map;
061 import java.util.Set;
062 import java.util.concurrent.ConcurrentHashMap;
063 import java.util.concurrent.TimeUnit;
064 import java.util.concurrent.TimeoutException;
065 import java.util.concurrent.locks.ReadWriteLock;
066 import java.util.concurrent.locks.ReentrantReadWriteLock;
067
068
071 public class ClusterSchedulerEngine
072 implements IdentifiableBean, SchedulerEngine,
073 SchedulerEngineClusterManager {
074
075 public static SchedulerEngine createClusterSchedulerEngine(
076 SchedulerEngine schedulerEngine) {
077
078 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
079 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
080 }
081
082 return schedulerEngine;
083 }
084
085 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
086 _schedulerEngine = schedulerEngine;
087 }
088
089 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
090 @Override
091 public void delete(String groupName) throws SchedulerException {
092 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
093
094 _readLock.lock();
095
096 try {
097 if (memoryClusteredSlaveJob) {
098 removeMemoryClusteredJobs(groupName);
099 }
100 else {
101 _schedulerEngine.delete(groupName);
102 }
103 }
104 finally {
105 _readLock.unlock();
106 }
107
108 setClusterableThreadLocal(groupName);
109 }
110
111 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
112 @Override
113 public void delete(String jobName, String groupName)
114 throws SchedulerException {
115
116 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
117
118 _readLock.lock();
119
120 try {
121 if (memoryClusteredSlaveJob) {
122 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
123 }
124 else {
125 _schedulerEngine.delete(jobName, groupName);
126 }
127 }
128 finally {
129 _readLock.unlock();
130 }
131
132 setClusterableThreadLocal(groupName);
133 }
134
135 @Override
136 public String getBeanIdentifier() {
137 return _beanIdentifier;
138 }
139
140 @Override
141 public SchedulerResponse getScheduledJob(String jobName, String groupName)
142 throws SchedulerException {
143
144 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
145 groupName);
146
147 StorageType storageType = objectValuePair.getValue();
148
149 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
150 String masterAddressString = getMasterAddressString(false);
151
152 if (!_localClusterNodeAddress.equals(masterAddressString)) {
153 return callMaster(
154 masterAddressString, _getScheduledJobMethodKey, jobName,
155 objectValuePair.getKey(), storageType);
156 }
157 }
158
159 _readLock.lock();
160
161 try {
162 return _schedulerEngine.getScheduledJob(jobName, groupName);
163 }
164 finally {
165 _readLock.unlock();
166 }
167 }
168
169 @Override
170 public List<SchedulerResponse> getScheduledJobs()
171 throws SchedulerException {
172
173 String masterAddressString = getMasterAddressString(false);
174
175 if (!_localClusterNodeAddress.equals(masterAddressString)) {
176 return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
177 }
178
179 _readLock.lock();
180
181 try {
182 return _schedulerEngine.getScheduledJobs();
183 }
184 finally {
185 _readLock.unlock();
186 }
187 }
188
189 @Override
190 public List<SchedulerResponse> getScheduledJobs(String groupName)
191 throws SchedulerException {
192
193 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
194 groupName);
195
196 StorageType storageType = objectValuePair.getValue();
197
198 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
199 String masterAddressString = getMasterAddressString(false);
200
201 if (!_localClusterNodeAddress.equals(masterAddressString)) {
202 return callMaster(
203 masterAddressString, _getScheduledJobsMethodKey2,
204 objectValuePair.getKey(), storageType);
205 }
206 }
207
208 _readLock.lock();
209
210 try {
211 return _schedulerEngine.getScheduledJobs(groupName);
212 }
213 finally {
214 _readLock.unlock();
215 }
216 }
217
218 @Override
219 public void initialize() throws SchedulerException {
220 try {
221 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
222
223 _readLock = readWriteLock.readLock();
224 _writeLock = readWriteLock.writeLock();
225
226 _localClusterNodeAddress = AddressSerializerUtil.serialize(
227 ClusterExecutorUtil.getLocalClusterNodeAddress());
228
229 _clusterEventListener = new MemorySchedulerClusterEventListener();
230
231 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
232
233 String masterAddressString = getMasterAddressString(false);
234
235 if (!_localClusterNodeAddress.equals(masterAddressString)) {
236 List<SchedulerResponse> schedulerResponses = callMaster(
237 masterAddressString, _getScheduledJobsMethodKey3,
238 StorageType.MEMORY_CLUSTERED);
239
240 initMemoryClusteredJobs(schedulerResponses);
241 }
242 }
243 catch (Exception e) {
244 throw new SchedulerException("Unable to initialize scheduler", e);
245 }
246 }
247
248 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
249 @Override
250 public void pause(String groupName) throws SchedulerException {
251 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
252
253 _readLock.lock();
254
255 try {
256 if (memoryClusteredSlaveJob) {
257 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
258 }
259 else {
260 _schedulerEngine.pause(groupName);
261 }
262 }
263 finally {
264 _readLock.unlock();
265 }
266
267 setClusterableThreadLocal(groupName);
268 }
269
270 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
271 @Override
272 public void pause(String jobName, String groupName)
273 throws SchedulerException {
274
275 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
276
277 _readLock.lock();
278
279 try {
280 if (memoryClusteredSlaveJob) {
281 updateMemoryClusteredJob(
282 jobName, groupName, TriggerState.PAUSED);
283 }
284 else {
285 _schedulerEngine.pause(jobName, groupName);
286 }
287 }
288 finally {
289 _readLock.unlock();
290 }
291
292 setClusterableThreadLocal(groupName);
293 }
294
295 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
296 @Override
297 public void resume(String groupName) throws SchedulerException {
298 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
299
300 _readLock.lock();
301
302 try {
303 if (memoryClusteredSlaveJob) {
304 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
305 }
306 else {
307 _schedulerEngine.resume(groupName);
308 }
309 }
310 finally {
311 _readLock.unlock();
312 }
313
314 setClusterableThreadLocal(groupName);
315 }
316
317 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
318 @Override
319 public void resume(String jobName, String groupName)
320 throws SchedulerException {
321
322 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
323
324 _readLock.lock();
325
326 try {
327 if (memoryClusteredSlaveJob) {
328 updateMemoryClusteredJob(
329 jobName, groupName, TriggerState.NORMAL);
330 }
331 else {
332 _schedulerEngine.resume(jobName, groupName);
333 }
334 }
335 finally {
336 _readLock.unlock();
337 }
338
339 setClusterableThreadLocal(groupName);
340 }
341
342 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
343 @Override
344 public void schedule(
345 Trigger trigger, String description, String destinationName,
346 Message message)
347 throws SchedulerException {
348
349 String groupName = trigger.getGroupName();
350 String jobName = trigger.getJobName();
351
352 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
353
354 _readLock.lock();
355
356 try {
357 if (memoryClusteredSlaveJob) {
358 SchedulerResponse schedulerResponse = new SchedulerResponse();
359
360 schedulerResponse.setDescription(description);
361 schedulerResponse.setDestinationName(destinationName);
362 schedulerResponse.setGroupName(groupName);
363 schedulerResponse.setJobName(jobName);
364 schedulerResponse.setMessage(message);
365 schedulerResponse.setTrigger(trigger);
366
367 _memoryClusteredJobs.put(
368 getFullName(jobName, groupName),
369 new ObjectValuePair<SchedulerResponse, TriggerState>(
370 schedulerResponse, TriggerState.NORMAL));
371 }
372 else {
373 _schedulerEngine.schedule(
374 trigger, description, destinationName, message);
375 }
376 }
377 finally {
378 _readLock.unlock();
379 }
380
381 setClusterableThreadLocal(groupName);
382 }
383
384 @Override
385 public void setBeanIdentifier(String beanIdentifier) {
386 _beanIdentifier = beanIdentifier;
387 }
388
389 @Override
390 public void shutdown() throws SchedulerException {
391 _portalReady = false;
392
393 try {
394 ClusterExecutorUtil.removeClusterEventListener(
395 _clusterEventListener);
396
397 LockLocalServiceUtil.unlock(
398 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
399 }
400 catch (Exception e) {
401 throw new SchedulerException("Unable to shutdown scheduler", e);
402 }
403
404 _schedulerEngine.shutdown();
405 }
406
407 @Override
408 public void start() throws SchedulerException {
409 _schedulerEngine.start();
410
411 _portalReady = true;
412 }
413
414 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
415 @Override
416 public void suppressError(String jobName, String groupName)
417 throws SchedulerException {
418
419 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
420
421 if (!memoryClusteredSlaveJob) {
422 _readLock.lock();
423
424 try {
425 _schedulerEngine.suppressError(jobName, groupName);
426 }
427 finally {
428 _readLock.unlock();
429 }
430 }
431
432 setClusterableThreadLocal(groupName);
433 }
434
435 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
436 @Override
437 public void unschedule(String groupName) throws SchedulerException {
438 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
439
440 _readLock.lock();
441
442 try {
443 if (memoryClusteredSlaveJob) {
444 removeMemoryClusteredJobs(groupName);
445 }
446 else {
447 _schedulerEngine.unschedule(groupName);
448 }
449 }
450 finally {
451 _readLock.unlock();
452 }
453
454 setClusterableThreadLocal(groupName);
455 }
456
457 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
458 @Override
459 public void unschedule(String jobName, String groupName)
460 throws SchedulerException {
461
462 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
463
464 _readLock.lock();
465
466 try {
467 if (memoryClusteredSlaveJob) {
468 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
469 }
470 else {
471 _schedulerEngine.unschedule(jobName, groupName);
472 }
473 }
474 finally {
475 _readLock.unlock();
476 }
477
478 setClusterableThreadLocal(groupName);
479 }
480
481 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
482 @Override
483 public void update(Trigger trigger) throws SchedulerException {
484 String jobName = trigger.getJobName();
485 String groupName = trigger.getGroupName();
486
487 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
488
489 _readLock.lock();
490
491 try {
492 if (memoryClusteredSlaveJob) {
493 boolean updated = false;
494
495 for (ObjectValuePair<SchedulerResponse, TriggerState>
496 memoryClusteredJob : _memoryClusteredJobs.values()) {
497
498 SchedulerResponse schedulerResponse =
499 memoryClusteredJob.getKey();
500
501 if (jobName.equals(schedulerResponse.getJobName()) &&
502 groupName.equals(schedulerResponse.getGroupName())) {
503
504 schedulerResponse.setTrigger(trigger);
505
506 updated = true;
507
508 break;
509 }
510 }
511
512 if (!updated) {
513 throw new SchedulerException(
514 "Unable to update trigger for memory clustered job");
515 }
516 }
517 else {
518 _schedulerEngine.update(trigger);
519 }
520 }
521 finally {
522 _readLock.unlock();
523 }
524
525 setClusterableThreadLocal(groupName);
526 }
527
528 @Override
529 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
530 getMasterAddressString(false);
531
532 return null;
533 }
534
535 protected <T> T callMaster(
536 String masterAddressString, MethodKey methodKey,
537 Object... arguments)
538 throws SchedulerException {
539
540 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
541
542 Address address = AddressSerializerUtil.deserialize(
543 masterAddressString);
544
545 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
546 methodHandler, address);
547
548 try {
549 FutureClusterResponses futureClusterResponses =
550 ClusterExecutorUtil.execute(clusterRequest);
551
552 ClusterNodeResponses clusterNodeResponses =
553 futureClusterResponses.get(20, TimeUnit.SECONDS);
554
555 ClusterNodeResponse clusterNodeResponse =
556 clusterNodeResponses.getClusterResponse(address);
557
558 return (T)clusterNodeResponse.getResult();
559 }
560 catch (Exception e) {
561 throw new SchedulerException(
562 "Unable to load scheduled jobs from cluster node " +
563 address.getDescription(),
564 e);
565 }
566 }
567
568 protected String getFullName(String jobName, String groupName) {
569 return groupName.concat(StringPool.PERIOD).concat(jobName);
570 }
571
572 protected String getMasterAddressString(boolean asynchronous)
573 throws SchedulerException {
574
575 String owner = null;
576
577 while (true) {
578 try {
579 Lock lock = null;
580
581 if (owner == null) {
582 lock = LockLocalServiceUtil.lock(
583 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
584 _localClusterNodeAddress);
585 }
586 else {
587 lock = LockLocalServiceUtil.lock(
588 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
589 _localClusterNodeAddress);
590 }
591
592 owner = lock.getOwner();
593
594 Address address = AddressSerializerUtil.deserialize(owner);
595
596 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
597 break;
598 }
599 }
600 catch (Exception e) {
601 if (_log.isWarnEnabled()) {
602 _log.warn(
603 "Unable to obtain memory scheduler cluster lock. " +
604 "Trying again.");
605 }
606 }
607 }
608
609 boolean master = _localClusterNodeAddress.equals(owner);
610
611 if (master == _master) {
612 return owner;
613 }
614
615 if (master) {
616 slaveToMaster();
617 }
618 else {
619 masterToSlave(owner, asynchronous);
620 }
621
622 return owner;
623 }
624
625 protected void initMemoryClusteredJobs(
626 List<SchedulerResponse> schedulerResponses)
627 throws Exception {
628
629 for (SchedulerResponse schedulerResponse : schedulerResponses) {
630 Trigger oldTrigger = schedulerResponse.getTrigger();
631
632 String jobName = schedulerResponse.getJobName();
633 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
634 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
635
636 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
637 oldTrigger.getTriggerType(), jobName, groupName,
638 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
639 oldTrigger.getTriggerContent());
640
641 schedulerResponse.setTrigger(newTrigger);
642
643 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
644 schedulerResponse);
645
646 Message message = schedulerResponse.getMessage();
647
648 message.remove(JOB_STATE);
649
650 _memoryClusteredJobs.put(
651 getFullName(jobName, groupName),
652 new ObjectValuePair<SchedulerResponse, TriggerState>(
653 schedulerResponse, triggerState));
654 }
655 }
656
657 protected boolean isMemoryClusteredSlaveJob(String groupName)
658 throws SchedulerException {
659
660 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
661 groupName);
662
663 StorageType storageType = objectValuePair.getValue();
664
665 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
666 return false;
667 }
668
669 String masterAddressString = getMasterAddressString(false);
670
671 if (_localClusterNodeAddress.equals(masterAddressString)) {
672 return false;
673 }
674
675 return true;
676 }
677
678 protected void masterToSlave(
679 String masterAddressString, boolean asynchronous)
680 throws SchedulerException {
681
682 if (asynchronous) {
683 MethodHandler methodHandler = new MethodHandler(
684 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
685
686 Address address = AddressSerializerUtil.deserialize(
687 masterAddressString);
688
689 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
690 methodHandler, address);
691
692 try {
693 ClusterExecutorUtil.execute(
694 clusterRequest,
695 new MemorySchedulerClusterResponseCallback(address), 20,
696 TimeUnit.SECONDS);
697
698 return;
699 }
700 catch (Exception e) {
701 throw new SchedulerException(
702 "Unable to load scheduled jobs from cluster node " +
703 address.getDescription(),
704 e);
705 }
706 }
707
708 List<SchedulerResponse> schedulerResponses = callMaster(
709 masterAddressString, _getScheduledJobsMethodKey3,
710 StorageType.MEMORY_CLUSTERED);
711
712 _doMasterToSlave(schedulerResponses);
713 }
714
715 protected void removeMemoryClusteredJobs(String groupName) {
716 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
717 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
718
719 Iterator
720 <Map.Entry<String,
721 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
722 memoryClusteredJobs.iterator();
723
724 while (itr.hasNext()) {
725 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
726 entry = itr.next();
727
728 ObjectValuePair<SchedulerResponse, TriggerState>
729 memoryClusteredJob = entry.getValue();
730
731 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
732
733 if (groupName.equals(schedulerResponse.getGroupName())) {
734 itr.remove();
735 }
736 }
737 }
738
739 protected ObjectValuePair<String, StorageType> resolveGroupName(
740 String groupName) {
741
742 int index = groupName.indexOf(CharPool.POUND);
743
744 String storageTypeString = groupName.substring(0, index);
745
746 StorageType storageType = StorageType.valueOf(storageTypeString);
747
748 String orginalGroupName = groupName.substring(index + 1);
749
750 return new ObjectValuePair<String, StorageType>(
751 orginalGroupName, storageType);
752 }
753
754 protected void setClusterableThreadLocal(String groupName) {
755 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
756 groupName);
757
758 ClusterableContextThreadLocal.putThreadLocalContext(
759 STORAGE_TYPE, objectValuePair.getValue());
760 ClusterableContextThreadLocal.putThreadLocalContext(
761 _PORTAL_READY, _portalReady);
762
763 boolean pluginReady = true;
764
765 if (PluginContextLifecycleThreadLocal.isInitializing() ||
766 PluginContextLifecycleThreadLocal.isDestroying()) {
767
768 pluginReady = false;
769 }
770
771 ClusterableContextThreadLocal.putThreadLocalContext(
772 _PLUGIN_READY, pluginReady);
773 }
774
775 protected void slaveToMaster() throws SchedulerException {
776 boolean forceSync = ProxyModeThreadLocal.isForceSync();
777
778 ProxyModeThreadLocal.setForceSync(true);
779
780 _writeLock.lock();
781
782 try {
783 for (ObjectValuePair<SchedulerResponse, TriggerState>
784 memoryClusteredJob : _memoryClusteredJobs.values()) {
785
786 SchedulerResponse schedulerResponse =
787 memoryClusteredJob.getKey();
788
789 _schedulerEngine.schedule(
790 schedulerResponse.getTrigger(),
791 schedulerResponse.getDescription(),
792 schedulerResponse.getDestinationName(),
793 schedulerResponse.getMessage());
794
795 TriggerState triggerState = memoryClusteredJob.getValue();
796
797 if (triggerState.equals(TriggerState.PAUSED)) {
798 _schedulerEngine.pause(
799 schedulerResponse.getJobName(),
800 schedulerResponse.getGroupName());
801 }
802 }
803
804 _memoryClusteredJobs.clear();
805 }
806 finally {
807 ProxyModeThreadLocal.setForceSync(forceSync);
808
809 _master = true;
810
811 _writeLock.unlock();
812 }
813 }
814
815 protected void updateMemoryClusteredJob(
816 String jobName, String groupName, TriggerState triggerState) {
817
818 ObjectValuePair<SchedulerResponse, TriggerState>
819 memoryClusteredJob = _memoryClusteredJobs.get(
820 getFullName(jobName, groupName));
821
822 if (memoryClusteredJob != null) {
823 memoryClusteredJob.setValue(triggerState);
824 }
825 }
826
827 protected void updateMemoryClusteredJobs(
828 String groupName, TriggerState triggerState) {
829
830 for (ObjectValuePair<SchedulerResponse, TriggerState>
831 memoryClusteredJob : _memoryClusteredJobs.values()) {
832
833 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
834
835 if (groupName.equals(schedulerResponse.getGroupName())) {
836 memoryClusteredJob.setValue(triggerState);
837 }
838 }
839 }
840
841 @BeanReference(
842 name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
843 protected SchedulerEngine schedulerEngine;
844
845 private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
846 throws SchedulerException {
847
848 _writeLock.lock();
849
850 try {
851 for (SchedulerResponse schedulerResponse :
852 _schedulerEngine.getScheduledJobs()) {
853
854 if (StorageType.MEMORY_CLUSTERED ==
855 schedulerResponse.getStorageType()) {
856
857 String groupName = StorageType.MEMORY_CLUSTERED.toString();
858
859 groupName = groupName.concat(StringPool.POUND).concat(
860 schedulerResponse.getGroupName());
861
862 _schedulerEngine.delete(
863 schedulerResponse.getJobName(), groupName);
864 }
865 }
866
867 initMemoryClusteredJobs(schedulerResponses);
868
869 if (_log.isInfoEnabled()) {
870 _log.info("Switched current node from master to slave");
871 }
872 }
873 catch (Exception e) {
874 throw new SchedulerException(e);
875 }
876 finally {
877 _master = false;
878
879 _writeLock.unlock();
880 }
881 }
882
883 private static final String _LOCK_CLASS_NAME =
884 SchedulerEngine.class.getName();
885
886 private static final String _PLUGIN_READY = "plugin.ready";
887
888 private static final String _PORTAL_READY = "portal.ready";
889
890 private static Log _log = LogFactoryUtil.getLog(
891 ClusterSchedulerEngine.class);
892
893 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
894 SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
895 String.class, StorageType.class);
896 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
897 SchedulerEngineHelperUtil.class, "getScheduledJobs");
898 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
899 SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
900 StorageType.class);
901 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
902 SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
903
904 private String _beanIdentifier;
905 private ClusterEventListener _clusterEventListener;
906 private volatile String _localClusterNodeAddress;
907 private volatile boolean _master;
908 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
909 _memoryClusteredJobs = new ConcurrentHashMap
910 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
911 private boolean _portalReady;
912 private java.util.concurrent.locks.Lock _readLock;
913 private SchedulerEngine _schedulerEngine;
914 private java.util.concurrent.locks.Lock _writeLock;
915
916 private static class SchedulerClusterInvokeAcceptor
917 implements ClusterInvokeAcceptor {
918
919 @Override
920 public boolean accept(Map<String, Serializable> context) {
921 if (ClusterInvokeThreadLocal.isEnabled()) {
922 return true;
923 }
924
925 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
926 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
927 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
928
929 if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
930 !pluginReady) {
931
932 return false;
933 }
934
935 return true;
936 }
937
938 }
939
940 private class MemorySchedulerClusterEventListener
941 implements ClusterEventListener {
942
943 @Override
944 public void processClusterEvent(ClusterEvent clusterEvent) {
945 try {
946 getMasterAddressString(true);
947 }
948 catch (Exception e) {
949 _log.error("Unable to update memory scheduler cluster lock", e);
950 }
951 }
952
953 }
954
955 private class MemorySchedulerClusterResponseCallback
956 extends BaseClusterResponseCallback {
957
958 public MemorySchedulerClusterResponseCallback(Address address) {
959 _address = address;
960 }
961
962 @Override
963 public void callback(ClusterNodeResponses clusterNodeResponses) {
964 try {
965 ClusterNodeResponse clusterNodeResponse =
966 clusterNodeResponses.getClusterResponse(_address);
967
968 List<SchedulerResponse> schedulerResponses =
969 (List<SchedulerResponse>)clusterNodeResponse.getResult();
970
971 _doMasterToSlave(schedulerResponses);
972 }
973 catch (Exception e) {
974 _log.error(
975 "Unable to load memory clustered jobs from cluster node " +
976 _address.getDescription(),
977 e);
978 }
979 }
980
981 @Override
982 public void processTimeoutException(TimeoutException timeoutException) {
983 _log.error(
984 "Unable to load memory clustered jobs from cluster node " +
985 _address.getDescription(),
986 timeoutException);
987 }
988
989 private Address _address;
990
991 }
992
993 }