001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterInvokeThreadLocal;
018 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
019 import com.liferay.portal.kernel.bean.BeanReference;
020 import com.liferay.portal.kernel.bean.IdentifiableBean;
021 import com.liferay.portal.kernel.cluster.Address;
022 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023 import com.liferay.portal.kernel.cluster.ClusterEvent;
024 import com.liferay.portal.kernel.cluster.ClusterEventListener;
025 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
028 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
029 import com.liferay.portal.kernel.cluster.ClusterRequest;
030 import com.liferay.portal.kernel.cluster.Clusterable;
031 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
032 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
033 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
034 import com.liferay.portal.kernel.log.Log;
035 import com.liferay.portal.kernel.log.LogFactoryUtil;
036 import com.liferay.portal.kernel.messaging.Message;
037 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
038 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
039 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
040 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
041 import com.liferay.portal.kernel.scheduler.SchedulerException;
042 import com.liferay.portal.kernel.scheduler.StorageType;
043 import com.liferay.portal.kernel.scheduler.Trigger;
044 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
045 import com.liferay.portal.kernel.scheduler.TriggerState;
046 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
047 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
048 import com.liferay.portal.kernel.util.Base64;
049 import com.liferay.portal.kernel.util.CharPool;
050 import com.liferay.portal.kernel.util.MethodHandler;
051 import com.liferay.portal.kernel.util.MethodKey;
052 import com.liferay.portal.kernel.util.ObjectValuePair;
053 import com.liferay.portal.kernel.util.StringPool;
054 import com.liferay.portal.model.Lock;
055 import com.liferay.portal.service.LockLocalServiceUtil;
056 import com.liferay.portal.util.PropsValues;
057
058 import java.io.ObjectInputStream;
059 import java.io.ObjectOutputStream;
060 import java.io.Serializable;
061
062 import java.util.Iterator;
063 import java.util.List;
064 import java.util.Map;
065 import java.util.Set;
066 import java.util.concurrent.ConcurrentHashMap;
067 import java.util.concurrent.TimeUnit;
068 import java.util.concurrent.TimeoutException;
069 import java.util.concurrent.locks.ReadWriteLock;
070 import java.util.concurrent.locks.ReentrantReadWriteLock;
071
072
075 public class ClusterSchedulerEngine
076 implements IdentifiableBean, SchedulerEngine,
077 SchedulerEngineClusterManager {
078
079 public static SchedulerEngine createClusterSchedulerEngine(
080 SchedulerEngine schedulerEngine) {
081
082 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
083 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
084 }
085
086 return schedulerEngine;
087 }
088
089 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
090 _schedulerEngine = schedulerEngine;
091 }
092
093 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
094 public void delete(String groupName) throws SchedulerException {
095 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
096
097 _readLock.lock();
098
099 try {
100 if (memoryClusteredSlaveJob) {
101 removeMemoryClusteredJobs(groupName);
102 }
103 else {
104 _schedulerEngine.delete(groupName);
105 }
106 }
107 finally {
108 _readLock.unlock();
109 }
110
111 setClusterableThreadLocal(groupName);
112 }
113
114 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
115 public void delete(String jobName, String groupName)
116 throws SchedulerException {
117
118 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
119
120 _readLock.lock();
121
122 try {
123 if (memoryClusteredSlaveJob) {
124 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
125 }
126 else {
127 _schedulerEngine.delete(jobName, groupName);
128 }
129 }
130 finally {
131 _readLock.unlock();
132 }
133
134 setClusterableThreadLocal(groupName);
135 }
136
137 public String getBeanIdentifier() {
138 return _beanIdentifier;
139 }
140
141 public SchedulerResponse getScheduledJob(String jobName, String groupName)
142 throws SchedulerException {
143
144 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
145 groupName);
146
147 StorageType storageType = objectValuePair.getValue();
148
149 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
150 String masterAddressString = getMasterAddressString(false);
151
152 if (!_localClusterNodeAddress.equals(masterAddressString)) {
153 return (SchedulerResponse)callMaster(
154 masterAddressString, _getScheduledJobMethodKey, jobName,
155 objectValuePair.getKey(), storageType);
156 }
157 }
158
159 _readLock.lock();
160
161 try {
162 return _schedulerEngine.getScheduledJob(jobName, groupName);
163 }
164 finally {
165 _readLock.unlock();
166 }
167 }
168
169 public List<SchedulerResponse> getScheduledJobs()
170 throws SchedulerException {
171
172 String masterAddressString = getMasterAddressString(false);
173
174 if (!_localClusterNodeAddress.equals(masterAddressString)) {
175 return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
176 }
177
178 _readLock.lock();
179
180 try {
181 return _schedulerEngine.getScheduledJobs();
182 }
183 finally {
184 _readLock.unlock();
185 }
186 }
187
188 public List<SchedulerResponse> getScheduledJobs(String groupName)
189 throws SchedulerException {
190
191 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
192 groupName);
193
194 StorageType storageType = objectValuePair.getValue();
195
196 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
197 String masterAddressString = getMasterAddressString(false);
198
199 if (!_localClusterNodeAddress.equals(masterAddressString)) {
200 return callMaster(
201 masterAddressString, _getScheduledJobsMethodKey2,
202 objectValuePair.getKey(), storageType);
203 }
204 }
205
206 _readLock.lock();
207
208 try {
209 return _schedulerEngine.getScheduledJobs(groupName);
210 }
211 finally {
212 _readLock.unlock();
213 }
214 }
215
216 public void initialize() throws SchedulerException {
217 try {
218 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
219
220 _readLock = readWriteLock.readLock();
221 _writeLock = readWriteLock.writeLock();
222
223 _localClusterNodeAddress = getSerializedString(
224 ClusterExecutorUtil.getLocalClusterNodeAddress());
225
226 _clusterEventListener = new MemorySchedulerClusterEventListener();
227
228 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
229
230 String masterAddressString = getMasterAddressString(false);
231
232 if (!_localClusterNodeAddress.equals(masterAddressString)) {
233 List<SchedulerResponse> schedulerResponses = callMaster(
234 masterAddressString, _getScheduledJobsMethodKey3,
235 StorageType.MEMORY_CLUSTERED);
236
237 initMemoryClusteredJobs(schedulerResponses);
238 }
239 }
240 catch (Exception e) {
241 throw new SchedulerException("Unable to initialize scheduler", e);
242 }
243 }
244
245 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
246 public void pause(String groupName) throws SchedulerException {
247 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
248
249 _readLock.lock();
250
251 try {
252 if (memoryClusteredSlaveJob) {
253 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
254 }
255 else {
256 _schedulerEngine.pause(groupName);
257 }
258 }
259 finally {
260 _readLock.unlock();
261 }
262
263 setClusterableThreadLocal(groupName);
264 }
265
266 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
267 public void pause(String jobName, String groupName)
268 throws SchedulerException {
269
270 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
271
272 _readLock.lock();
273
274 try {
275 if (memoryClusteredSlaveJob) {
276 updateMemoryClusteredJob(
277 jobName, groupName, TriggerState.PAUSED);
278 }
279 else {
280 _schedulerEngine.pause(jobName, groupName);
281 }
282 }
283 finally {
284 _readLock.unlock();
285 }
286
287 setClusterableThreadLocal(groupName);
288 }
289
290 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
291 public void resume(String groupName) throws SchedulerException {
292 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
293
294 _readLock.lock();
295
296 try {
297 if (memoryClusteredSlaveJob) {
298 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
299 }
300 else {
301 _schedulerEngine.resume(groupName);
302 }
303 }
304 finally {
305 _readLock.unlock();
306 }
307
308 setClusterableThreadLocal(groupName);
309 }
310
311 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
312 public void resume(String jobName, String groupName)
313 throws SchedulerException {
314
315 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
316
317 _readLock.lock();
318
319 try {
320 if (memoryClusteredSlaveJob) {
321 updateMemoryClusteredJob(
322 jobName, groupName, TriggerState.NORMAL);
323 }
324 else {
325 _schedulerEngine.resume(jobName, groupName);
326 }
327 }
328 finally {
329 _readLock.unlock();
330 }
331
332 setClusterableThreadLocal(groupName);
333 }
334
335 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
336 public void schedule(
337 Trigger trigger, String description, String destinationName,
338 Message message)
339 throws SchedulerException {
340
341 String groupName = trigger.getGroupName();
342 String jobName = trigger.getJobName();
343
344 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
345
346 _readLock.lock();
347
348 try {
349 if (memoryClusteredSlaveJob) {
350 SchedulerResponse schedulerResponse = new SchedulerResponse();
351
352 schedulerResponse.setDescription(description);
353 schedulerResponse.setDestinationName(destinationName);
354 schedulerResponse.setGroupName(groupName);
355 schedulerResponse.setJobName(jobName);
356 schedulerResponse.setMessage(message);
357 schedulerResponse.setTrigger(trigger);
358
359 _memoryClusteredJobs.put(
360 getFullName(jobName, groupName),
361 new ObjectValuePair<SchedulerResponse, TriggerState>(
362 schedulerResponse, TriggerState.NORMAL));
363 }
364 else {
365 _schedulerEngine.schedule(
366 trigger, description, destinationName, message);
367 }
368 }
369 finally {
370 _readLock.unlock();
371 }
372
373 setClusterableThreadLocal(groupName);
374 }
375
376 public void setBeanIdentifier(String beanIdentifier) {
377 _beanIdentifier = beanIdentifier;
378 }
379
380 public void shutdown() throws SchedulerException {
381 _portalReady = false;
382
383 try {
384 ClusterExecutorUtil.removeClusterEventListener(
385 _clusterEventListener);
386
387 LockLocalServiceUtil.unlock(
388 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
389 PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
390 }
391 catch (Exception e) {
392 throw new SchedulerException("Unable to shutdown scheduler", e);
393 }
394
395 _schedulerEngine.shutdown();
396 }
397
398 public void start() throws SchedulerException {
399 _schedulerEngine.start();
400
401 _portalReady = true;
402 }
403
404 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
405 public void suppressError(String jobName, String groupName)
406 throws SchedulerException {
407
408 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
409
410 if (!memoryClusteredSlaveJob) {
411 _readLock.lock();
412
413 try {
414 _schedulerEngine.suppressError(jobName, groupName);
415 }
416 finally {
417 _readLock.unlock();
418 }
419 }
420
421 setClusterableThreadLocal(groupName);
422 }
423
424 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
425 public void unschedule(String groupName) throws SchedulerException {
426 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
427
428 _readLock.lock();
429
430 try {
431 if (memoryClusteredSlaveJob) {
432 removeMemoryClusteredJobs(groupName);
433 }
434 else {
435 _schedulerEngine.unschedule(groupName);
436 }
437 }
438 finally {
439 _readLock.unlock();
440 }
441
442 setClusterableThreadLocal(groupName);
443 }
444
445 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
446 public void unschedule(String jobName, String groupName)
447 throws SchedulerException {
448
449 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
450
451 _readLock.lock();
452
453 try {
454 if (memoryClusteredSlaveJob) {
455 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
456 }
457 else {
458 _schedulerEngine.unschedule(jobName, groupName);
459 }
460 }
461 finally {
462 _readLock.unlock();
463 }
464
465 setClusterableThreadLocal(groupName);
466 }
467
468 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
469 public void update(Trigger trigger) throws SchedulerException {
470 String jobName = trigger.getJobName();
471 String groupName = trigger.getGroupName();
472
473 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
474
475 _readLock.lock();
476
477 try {
478 if (memoryClusteredSlaveJob) {
479 boolean updated = false;
480
481 for (ObjectValuePair<SchedulerResponse, TriggerState>
482 memoryClusteredJob : _memoryClusteredJobs.values()) {
483
484 SchedulerResponse schedulerResponse =
485 memoryClusteredJob.getKey();
486
487 if (jobName.equals(schedulerResponse.getJobName()) &&
488 groupName.equals(schedulerResponse.getGroupName())) {
489
490 schedulerResponse.setTrigger(trigger);
491
492 updated = true;
493
494 break;
495 }
496 }
497
498 if (!updated) {
499 throw new SchedulerException(
500 "Unable to update trigger for memory clustered job");
501 }
502 }
503 else {
504 _schedulerEngine.update(trigger);
505 }
506 }
507 finally {
508 _readLock.unlock();
509 }
510
511 setClusterableThreadLocal(groupName);
512 }
513
514 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
515 getMasterAddressString(false);
516
517 return null;
518 }
519
520 protected <T> T callMaster(
521 String masterAddressString, MethodKey methodKey,
522 Object... arguments)
523 throws SchedulerException {
524
525 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
526
527 Address address = (Address)getDeserializedObject(masterAddressString);
528
529 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
530 methodHandler, address);
531
532 try {
533 FutureClusterResponses futureClusterResponses =
534 ClusterExecutorUtil.execute(clusterRequest);
535
536 ClusterNodeResponses clusterNodeResponses =
537 futureClusterResponses.get(20, TimeUnit.SECONDS);
538
539 ClusterNodeResponse clusterNodeResponse =
540 clusterNodeResponses.getClusterResponse(address);
541
542 return (T)clusterNodeResponse.getResult();
543 }
544 catch (Exception e) {
545 throw new SchedulerException(
546 "Unable to load scheduled jobs from cluster node " +
547 address.getDescription(),
548 e);
549 }
550 }
551
552 protected Object getDeserializedObject(String string)
553 throws SchedulerException {
554
555 byte[] bytes = Base64.decode(string);
556
557 UnsyncByteArrayInputStream byteArrayInputStream =
558 new UnsyncByteArrayInputStream(bytes);
559
560 ObjectInputStream objectInputStream = null;
561
562 try {
563 objectInputStream = new ObjectInputStream(byteArrayInputStream);
564
565 Object object = objectInputStream.readObject();
566
567 return object;
568 }
569 catch (Exception e) {
570 throw new SchedulerException(
571 "Unable to deserialize object from " + string, e);
572 }
573 finally {
574 try {
575 objectInputStream.close();
576 }
577 catch (Exception e) {
578 }
579 }
580 }
581
582 protected String getFullName(String jobName, String groupName) {
583 return groupName.concat(StringPool.PERIOD).concat(jobName);
584 }
585
586 protected String getMasterAddressString(boolean asynchronous)
587 throws SchedulerException {
588
589 String owner = null;
590
591 Lock lock = null;
592
593 while (true) {
594 try {
595 if (owner == null) {
596 lock = LockLocalServiceUtil.lock(
597 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
598 _localClusterNodeAddress,
599 PropsValues.
600 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
601 }
602 else {
603 lock = LockLocalServiceUtil.lock(
604 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
605 _localClusterNodeAddress,
606 PropsValues.
607 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
608 }
609
610 Address address = (Address)getDeserializedObject(
611 lock.getOwner());
612
613 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
614 break;
615 }
616 else {
617 owner = lock.getOwner();
618 }
619 }
620 catch (Exception e) {
621 if (_log.isWarnEnabled()) {
622 _log.warn(
623 "Unable to obtain memory scheduler cluster lock. " +
624 "Trying again.");
625 }
626 }
627 }
628
629 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
630
631 if (master == _master) {
632 return lock.getOwner();
633 }
634
635 if (master) {
636 slaveToMaster();
637 }
638 else {
639 masterToSlave(lock.getOwner(), asynchronous);
640 }
641
642 return lock.getOwner();
643 }
644
645 protected String getSerializedString(Object object) throws Exception {
646 UnsyncByteArrayOutputStream byteArrayOutputStream =
647 new UnsyncByteArrayOutputStream();
648
649 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
650 byteArrayOutputStream);
651
652 objectOutputStream.writeObject(object);
653 objectOutputStream.close();
654
655 byte[] bytes = byteArrayOutputStream.toByteArray();
656
657 return Base64.encode(bytes);
658 }
659
660 protected void initMemoryClusteredJobs(
661 List<SchedulerResponse> schedulerResponses)
662 throws Exception {
663
664 for (SchedulerResponse schedulerResponse : schedulerResponses) {
665 Trigger oldTrigger = schedulerResponse.getTrigger();
666
667 String jobName = schedulerResponse.getJobName();
668 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
669 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
670
671 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
672 oldTrigger.getTriggerType(), jobName, groupName,
673 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
674 oldTrigger.getTriggerContent());
675
676 schedulerResponse.setTrigger(newTrigger);
677
678 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
679 schedulerResponse);
680
681 Message message = schedulerResponse.getMessage();
682
683 message.remove(JOB_STATE);
684
685 _memoryClusteredJobs.put(
686 getFullName(jobName, groupName),
687 new ObjectValuePair<SchedulerResponse, TriggerState>(
688 schedulerResponse, triggerState));
689 }
690 }
691
692 protected boolean isMemoryClusteredSlaveJob(String groupName)
693 throws SchedulerException {
694
695 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
696 groupName);
697
698 StorageType storageType = objectValuePair.getValue();
699
700 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
701 return false;
702 }
703
704 String masterAddressString = getMasterAddressString(false);
705
706 if (_localClusterNodeAddress.equals(masterAddressString)) {
707 return false;
708 }
709
710 return true;
711 }
712
713 protected void masterToSlave(
714 String masterAddressString, boolean asynchronous)
715 throws SchedulerException {
716
717 if (asynchronous) {
718 MethodHandler methodHandler = new MethodHandler(
719 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
720
721 Address address = (Address)getDeserializedObject(
722 masterAddressString);
723
724 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
725 methodHandler, address);
726
727 try {
728 ClusterExecutorUtil.execute(
729 clusterRequest,
730 new MemorySchedulerClusterResponseCallback(address), 20,
731 TimeUnit.SECONDS);
732
733 return;
734 }
735 catch (Exception e) {
736 throw new SchedulerException(
737 "Unable to load scheduled jobs from cluster node " +
738 address.getDescription(),
739 e);
740 }
741 }
742
743 List<SchedulerResponse> schedulerResponses = callMaster(
744 masterAddressString, _getScheduledJobsMethodKey3,
745 StorageType.MEMORY_CLUSTERED);
746
747 _doMasterToSlave(schedulerResponses);
748 }
749
750 protected void removeMemoryClusteredJobs(String groupName) {
751 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
752 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
753
754 Iterator
755 <Map.Entry<String,
756 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
757 memoryClusteredJobs.iterator();
758
759 while (itr.hasNext()) {
760 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
761 entry = itr.next();
762
763 ObjectValuePair<SchedulerResponse, TriggerState>
764 memoryClusteredJob = entry.getValue();
765
766 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
767
768 if (groupName.equals(schedulerResponse.getGroupName())) {
769 itr.remove();
770 }
771 }
772 }
773
774 protected ObjectValuePair<String, StorageType> resolveGroupName(
775 String groupName) {
776
777 int index = groupName.indexOf(CharPool.POUND);
778
779 String storageTypeString = groupName.substring(0, index);
780
781 StorageType storageType = StorageType.valueOf(storageTypeString);
782
783 String orginalGroupName = groupName.substring(index + 1);
784
785 return new ObjectValuePair<String, StorageType>(
786 orginalGroupName, storageType);
787 }
788
789 protected void setClusterableThreadLocal(String groupName) {
790 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
791 groupName);
792
793 ClusterableContextThreadLocal.putThreadLocalContext(
794 STORAGE_TYPE, objectValuePair.getValue());
795 ClusterableContextThreadLocal.putThreadLocalContext(
796 _PORTAL_READY, _portalReady);
797
798 boolean pluginReady = true;
799
800 if (PluginContextLifecycleThreadLocal.isInitializing() ||
801 PluginContextLifecycleThreadLocal.isDestroying()) {
802
803 pluginReady = false;
804 }
805
806 ClusterableContextThreadLocal.putThreadLocalContext(
807 _PLUGIN_READY, pluginReady);
808 }
809
810 protected void slaveToMaster() throws SchedulerException {
811 boolean forceSync = ProxyModeThreadLocal.isForceSync();
812
813 ProxyModeThreadLocal.setForceSync(true);
814
815 _writeLock.lock();
816
817 try {
818 for (ObjectValuePair<SchedulerResponse, TriggerState>
819 memoryClusteredJob : _memoryClusteredJobs.values()) {
820
821 SchedulerResponse schedulerResponse =
822 memoryClusteredJob.getKey();
823
824 _schedulerEngine.schedule(
825 schedulerResponse.getTrigger(),
826 schedulerResponse.getDescription(),
827 schedulerResponse.getDestinationName(),
828 schedulerResponse.getMessage());
829
830 TriggerState triggerState = memoryClusteredJob.getValue();
831
832 if (triggerState.equals(TriggerState.PAUSED)) {
833 _schedulerEngine.pause(
834 schedulerResponse.getJobName(),
835 schedulerResponse.getGroupName());
836 }
837 }
838
839 _memoryClusteredJobs.clear();
840 }
841 finally {
842 ProxyModeThreadLocal.setForceSync(forceSync);
843
844 _master = true;
845
846 _writeLock.unlock();
847 }
848 }
849
850 protected void updateMemoryClusteredJob(
851 String jobName, String groupName, TriggerState triggerState) {
852
853 ObjectValuePair<SchedulerResponse, TriggerState>
854 memoryClusteredJob = _memoryClusteredJobs.get(
855 getFullName(jobName, groupName));
856
857 if (memoryClusteredJob != null) {
858 memoryClusteredJob.setValue(triggerState);
859 }
860 }
861
862 protected void updateMemoryClusteredJobs(
863 String groupName, TriggerState triggerState) {
864
865 for (ObjectValuePair<SchedulerResponse, TriggerState>
866 memoryClusteredJob : _memoryClusteredJobs.values()) {
867
868 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
869
870 if (groupName.equals(schedulerResponse.getGroupName())) {
871 memoryClusteredJob.setValue(triggerState);
872 }
873 }
874 }
875
876 @BeanReference(
877 name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
878 protected SchedulerEngine schedulerEngine;
879
880 private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
881 throws SchedulerException {
882
883 _writeLock.lock();
884
885 try {
886 for (SchedulerResponse schedulerResponse :
887 _schedulerEngine.getScheduledJobs()) {
888
889 if (StorageType.MEMORY_CLUSTERED ==
890 schedulerResponse.getStorageType()) {
891
892 String groupName = StorageType.MEMORY_CLUSTERED.toString();
893
894 groupName = groupName.concat(StringPool.POUND).concat(
895 schedulerResponse.getGroupName());
896
897 _schedulerEngine.delete(
898 schedulerResponse.getJobName(), groupName);
899 }
900 }
901
902 initMemoryClusteredJobs(schedulerResponses);
903
904 if (_log.isInfoEnabled()) {
905 _log.info("Switched current node from master to slave");
906 }
907 }
908 catch (Exception e) {
909 throw new SchedulerException(e);
910 }
911 finally {
912 _master = false;
913
914 _writeLock.unlock();
915 }
916 }
917
918 private static final String _LOCK_CLASS_NAME =
919 SchedulerEngine.class.getName();
920
921 private static final String _PLUGIN_READY = "plugin.ready";
922
923 private static final String _PORTAL_READY = "portal.ready";
924
925 private static Log _log = LogFactoryUtil.getLog(
926 ClusterSchedulerEngine.class);
927
928 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
929 SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
930 String.class, StorageType.class);
931 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
932 SchedulerEngineHelperUtil.class, "getScheduledJobs");
933 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
934 SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
935 StorageType.class);
936 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
937 SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
938
939 private String _beanIdentifier;
940 private ClusterEventListener _clusterEventListener;
941 private volatile String _localClusterNodeAddress;
942 private volatile boolean _master;
943 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
944 _memoryClusteredJobs = new ConcurrentHashMap
945 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
946 private boolean _portalReady;
947 private java.util.concurrent.locks.Lock _readLock;
948 private SchedulerEngine _schedulerEngine;
949 private java.util.concurrent.locks.Lock _writeLock;
950
951 private static class SchedulerClusterInvokeAcceptor
952 implements ClusterInvokeAcceptor {
953
954 public boolean accept(Map<String, Serializable> context) {
955 if (ClusterInvokeThreadLocal.isEnabled()) {
956 return true;
957 }
958
959 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
960 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
961 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
962
963 if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
964 !pluginReady) {
965
966 return false;
967 }
968
969 return true;
970 }
971
972 }
973
974 private class MemorySchedulerClusterEventListener
975 implements ClusterEventListener {
976
977 public void processClusterEvent(ClusterEvent clusterEvent) {
978 try {
979 getMasterAddressString(true);
980 }
981 catch (Exception e) {
982 _log.error("Unable to update memory scheduler cluster lock", e);
983 }
984 }
985
986 }
987
988 private class MemorySchedulerClusterResponseCallback
989 extends BaseClusterResponseCallback {
990
991 public MemorySchedulerClusterResponseCallback(Address address) {
992 _address = address;
993 }
994
995 @Override
996 public void callback(ClusterNodeResponses clusterNodeResponses) {
997 try {
998 ClusterNodeResponse clusterNodeResponse =
999 clusterNodeResponses.getClusterResponse(_address);
1000
1001 List<SchedulerResponse> schedulerResponses =
1002 (List<SchedulerResponse>)clusterNodeResponse.getResult();
1003
1004 _doMasterToSlave(schedulerResponses);
1005 }
1006 catch (Exception e) {
1007 _log.error(
1008 "Unable to load memory clustered jobs from cluster node " +
1009 _address.getDescription(),
1010 e);
1011 }
1012 }
1013
1014 @Override
1015 public void processTimeoutException(TimeoutException timeoutException) {
1016 _log.error(
1017 "Unable to load memory clustered jobs from cluster node " +
1018 _address.getDescription(),
1019 timeoutException);
1020 }
1021
1022 private Address _address;
1023
1024 }
1025
1026 }