001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterInvokeThreadLocal;
018 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
019 import com.liferay.portal.kernel.bean.BeanReference;
020 import com.liferay.portal.kernel.bean.IdentifiableBean;
021 import com.liferay.portal.kernel.cluster.Address;
022 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023 import com.liferay.portal.kernel.cluster.ClusterEvent;
024 import com.liferay.portal.kernel.cluster.ClusterEventListener;
025 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
028 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
029 import com.liferay.portal.kernel.cluster.ClusterRequest;
030 import com.liferay.portal.kernel.cluster.Clusterable;
031 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
032 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
033 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
034 import com.liferay.portal.kernel.log.Log;
035 import com.liferay.portal.kernel.log.LogFactoryUtil;
036 import com.liferay.portal.kernel.messaging.Message;
037 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
038 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
039 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
040 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
041 import com.liferay.portal.kernel.scheduler.SchedulerException;
042 import com.liferay.portal.kernel.scheduler.StorageType;
043 import com.liferay.portal.kernel.scheduler.Trigger;
044 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
045 import com.liferay.portal.kernel.scheduler.TriggerState;
046 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
047 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
048 import com.liferay.portal.kernel.util.Base64;
049 import com.liferay.portal.kernel.util.CharPool;
050 import com.liferay.portal.kernel.util.MethodHandler;
051 import com.liferay.portal.kernel.util.MethodKey;
052 import com.liferay.portal.kernel.util.ObjectValuePair;
053 import com.liferay.portal.kernel.util.StringPool;
054 import com.liferay.portal.model.Lock;
055 import com.liferay.portal.service.LockLocalServiceUtil;
056 import com.liferay.portal.util.PropsValues;
057
058 import java.io.ObjectInputStream;
059 import java.io.ObjectOutputStream;
060 import java.io.Serializable;
061
062 import java.util.Iterator;
063 import java.util.List;
064 import java.util.Map;
065 import java.util.Set;
066 import java.util.concurrent.ConcurrentHashMap;
067 import java.util.concurrent.TimeUnit;
068 import java.util.concurrent.TimeoutException;
069 import java.util.concurrent.locks.ReadWriteLock;
070 import java.util.concurrent.locks.ReentrantReadWriteLock;
071
072
075 public class ClusterSchedulerEngine
076 implements IdentifiableBean, SchedulerEngine,
077 SchedulerEngineClusterManager {
078
079 public static SchedulerEngine createClusterSchedulerEngine(
080 SchedulerEngine schedulerEngine) {
081
082 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
083 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
084 }
085
086 return schedulerEngine;
087 }
088
089 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
090 _schedulerEngine = schedulerEngine;
091 }
092
093 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
094 @Override
095 public void delete(String groupName) throws SchedulerException {
096 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
097
098 _readLock.lock();
099
100 try {
101 if (memoryClusteredSlaveJob) {
102 removeMemoryClusteredJobs(groupName);
103 }
104 else {
105 _schedulerEngine.delete(groupName);
106 }
107 }
108 finally {
109 _readLock.unlock();
110 }
111
112 setClusterableThreadLocal(groupName);
113 }
114
115 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
116 @Override
117 public void delete(String jobName, String groupName)
118 throws SchedulerException {
119
120 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
121
122 _readLock.lock();
123
124 try {
125 if (memoryClusteredSlaveJob) {
126 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
127 }
128 else {
129 _schedulerEngine.delete(jobName, groupName);
130 }
131 }
132 finally {
133 _readLock.unlock();
134 }
135
136 setClusterableThreadLocal(groupName);
137 }
138
139 @Override
140 public String getBeanIdentifier() {
141 return _beanIdentifier;
142 }
143
144 @Override
145 public SchedulerResponse getScheduledJob(String jobName, String groupName)
146 throws SchedulerException {
147
148 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
149 groupName);
150
151 StorageType storageType = objectValuePair.getValue();
152
153 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
154 String masterAddressString = getMasterAddressString(false);
155
156 if (!_localClusterNodeAddress.equals(masterAddressString)) {
157 return (SchedulerResponse)callMaster(
158 masterAddressString, _getScheduledJobMethodKey, jobName,
159 objectValuePair.getKey(), storageType);
160 }
161 }
162
163 _readLock.lock();
164
165 try {
166 return _schedulerEngine.getScheduledJob(jobName, groupName);
167 }
168 finally {
169 _readLock.unlock();
170 }
171 }
172
173 @Override
174 public List<SchedulerResponse> getScheduledJobs()
175 throws SchedulerException {
176
177 String masterAddressString = getMasterAddressString(false);
178
179 if (!_localClusterNodeAddress.equals(masterAddressString)) {
180 return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
181 }
182
183 _readLock.lock();
184
185 try {
186 return _schedulerEngine.getScheduledJobs();
187 }
188 finally {
189 _readLock.unlock();
190 }
191 }
192
193 @Override
194 public List<SchedulerResponse> getScheduledJobs(String groupName)
195 throws SchedulerException {
196
197 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
198 groupName);
199
200 StorageType storageType = objectValuePair.getValue();
201
202 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
203 String masterAddressString = getMasterAddressString(false);
204
205 if (!_localClusterNodeAddress.equals(masterAddressString)) {
206 return callMaster(
207 masterAddressString, _getScheduledJobsMethodKey2,
208 objectValuePair.getKey(), storageType);
209 }
210 }
211
212 _readLock.lock();
213
214 try {
215 return _schedulerEngine.getScheduledJobs(groupName);
216 }
217 finally {
218 _readLock.unlock();
219 }
220 }
221
222 @Override
223 public void initialize() throws SchedulerException {
224 try {
225 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
226
227 _readLock = readWriteLock.readLock();
228 _writeLock = readWriteLock.writeLock();
229
230 _localClusterNodeAddress = getSerializedString(
231 ClusterExecutorUtil.getLocalClusterNodeAddress());
232
233 _clusterEventListener = new MemorySchedulerClusterEventListener();
234
235 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
236
237 String masterAddressString = getMasterAddressString(false);
238
239 if (!_localClusterNodeAddress.equals(masterAddressString)) {
240 List<SchedulerResponse> schedulerResponses = callMaster(
241 masterAddressString, _getScheduledJobsMethodKey3,
242 StorageType.MEMORY_CLUSTERED);
243
244 initMemoryClusteredJobs(schedulerResponses);
245 }
246 }
247 catch (Exception e) {
248 throw new SchedulerException("Unable to initialize scheduler", e);
249 }
250 }
251
252 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
253 @Override
254 public void pause(String groupName) throws SchedulerException {
255 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
256
257 _readLock.lock();
258
259 try {
260 if (memoryClusteredSlaveJob) {
261 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
262 }
263 else {
264 _schedulerEngine.pause(groupName);
265 }
266 }
267 finally {
268 _readLock.unlock();
269 }
270
271 setClusterableThreadLocal(groupName);
272 }
273
274 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
275 @Override
276 public void pause(String jobName, String groupName)
277 throws SchedulerException {
278
279 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
280
281 _readLock.lock();
282
283 try {
284 if (memoryClusteredSlaveJob) {
285 updateMemoryClusteredJob(
286 jobName, groupName, TriggerState.PAUSED);
287 }
288 else {
289 _schedulerEngine.pause(jobName, groupName);
290 }
291 }
292 finally {
293 _readLock.unlock();
294 }
295
296 setClusterableThreadLocal(groupName);
297 }
298
299 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
300 @Override
301 public void resume(String groupName) throws SchedulerException {
302 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
303
304 _readLock.lock();
305
306 try {
307 if (memoryClusteredSlaveJob) {
308 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
309 }
310 else {
311 _schedulerEngine.resume(groupName);
312 }
313 }
314 finally {
315 _readLock.unlock();
316 }
317
318 setClusterableThreadLocal(groupName);
319 }
320
321 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
322 @Override
323 public void resume(String jobName, String groupName)
324 throws SchedulerException {
325
326 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
327
328 _readLock.lock();
329
330 try {
331 if (memoryClusteredSlaveJob) {
332 updateMemoryClusteredJob(
333 jobName, groupName, TriggerState.NORMAL);
334 }
335 else {
336 _schedulerEngine.resume(jobName, groupName);
337 }
338 }
339 finally {
340 _readLock.unlock();
341 }
342
343 setClusterableThreadLocal(groupName);
344 }
345
346 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
347 @Override
348 public void schedule(
349 Trigger trigger, String description, String destinationName,
350 Message message)
351 throws SchedulerException {
352
353 String groupName = trigger.getGroupName();
354 String jobName = trigger.getJobName();
355
356 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
357
358 _readLock.lock();
359
360 try {
361 if (memoryClusteredSlaveJob) {
362 SchedulerResponse schedulerResponse = new SchedulerResponse();
363
364 schedulerResponse.setDescription(description);
365 schedulerResponse.setDestinationName(destinationName);
366 schedulerResponse.setGroupName(groupName);
367 schedulerResponse.setJobName(jobName);
368 schedulerResponse.setMessage(message);
369 schedulerResponse.setTrigger(trigger);
370
371 _memoryClusteredJobs.put(
372 getFullName(jobName, groupName),
373 new ObjectValuePair<SchedulerResponse, TriggerState>(
374 schedulerResponse, TriggerState.NORMAL));
375 }
376 else {
377 _schedulerEngine.schedule(
378 trigger, description, destinationName, message);
379 }
380 }
381 finally {
382 _readLock.unlock();
383 }
384
385 setClusterableThreadLocal(groupName);
386 }
387
388 @Override
389 public void setBeanIdentifier(String beanIdentifier) {
390 _beanIdentifier = beanIdentifier;
391 }
392
393 @Override
394 public void shutdown() throws SchedulerException {
395 _portalReady = false;
396
397 try {
398 ClusterExecutorUtil.removeClusterEventListener(
399 _clusterEventListener);
400
401 LockLocalServiceUtil.unlock(
402 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
403 }
404 catch (Exception e) {
405 throw new SchedulerException("Unable to shutdown scheduler", e);
406 }
407
408 _schedulerEngine.shutdown();
409 }
410
411 @Override
412 public void start() throws SchedulerException {
413 _schedulerEngine.start();
414
415 _portalReady = true;
416 }
417
418 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
419 @Override
420 public void suppressError(String jobName, String groupName)
421 throws SchedulerException {
422
423 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
424
425 if (!memoryClusteredSlaveJob) {
426 _readLock.lock();
427
428 try {
429 _schedulerEngine.suppressError(jobName, groupName);
430 }
431 finally {
432 _readLock.unlock();
433 }
434 }
435
436 setClusterableThreadLocal(groupName);
437 }
438
439 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
440 @Override
441 public void unschedule(String groupName) throws SchedulerException {
442 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
443
444 _readLock.lock();
445
446 try {
447 if (memoryClusteredSlaveJob) {
448 removeMemoryClusteredJobs(groupName);
449 }
450 else {
451 _schedulerEngine.unschedule(groupName);
452 }
453 }
454 finally {
455 _readLock.unlock();
456 }
457
458 setClusterableThreadLocal(groupName);
459 }
460
461 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
462 @Override
463 public void unschedule(String jobName, String groupName)
464 throws SchedulerException {
465
466 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
467
468 _readLock.lock();
469
470 try {
471 if (memoryClusteredSlaveJob) {
472 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
473 }
474 else {
475 _schedulerEngine.unschedule(jobName, groupName);
476 }
477 }
478 finally {
479 _readLock.unlock();
480 }
481
482 setClusterableThreadLocal(groupName);
483 }
484
485 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
486 @Override
487 public void update(Trigger trigger) throws SchedulerException {
488 String jobName = trigger.getJobName();
489 String groupName = trigger.getGroupName();
490
491 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
492
493 _readLock.lock();
494
495 try {
496 if (memoryClusteredSlaveJob) {
497 boolean updated = false;
498
499 for (ObjectValuePair<SchedulerResponse, TriggerState>
500 memoryClusteredJob : _memoryClusteredJobs.values()) {
501
502 SchedulerResponse schedulerResponse =
503 memoryClusteredJob.getKey();
504
505 if (jobName.equals(schedulerResponse.getJobName()) &&
506 groupName.equals(schedulerResponse.getGroupName())) {
507
508 schedulerResponse.setTrigger(trigger);
509
510 updated = true;
511
512 break;
513 }
514 }
515
516 if (!updated) {
517 throw new SchedulerException(
518 "Unable to update trigger for memory clustered job");
519 }
520 }
521 else {
522 _schedulerEngine.update(trigger);
523 }
524 }
525 finally {
526 _readLock.unlock();
527 }
528
529 setClusterableThreadLocal(groupName);
530 }
531
532 @Override
533 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
534 getMasterAddressString(false);
535
536 return null;
537 }
538
539 protected <T> T callMaster(
540 String masterAddressString, MethodKey methodKey,
541 Object... arguments)
542 throws SchedulerException {
543
544 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
545
546 Address address = (Address)getDeserializedObject(masterAddressString);
547
548 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
549 methodHandler, address);
550
551 try {
552 FutureClusterResponses futureClusterResponses =
553 ClusterExecutorUtil.execute(clusterRequest);
554
555 ClusterNodeResponses clusterNodeResponses =
556 futureClusterResponses.get(20, TimeUnit.SECONDS);
557
558 ClusterNodeResponse clusterNodeResponse =
559 clusterNodeResponses.getClusterResponse(address);
560
561 return (T)clusterNodeResponse.getResult();
562 }
563 catch (Exception e) {
564 throw new SchedulerException(
565 "Unable to load scheduled jobs from cluster node " +
566 address.getDescription(),
567 e);
568 }
569 }
570
571 protected Object getDeserializedObject(String string)
572 throws SchedulerException {
573
574 byte[] bytes = Base64.decode(string);
575
576 UnsyncByteArrayInputStream byteArrayInputStream =
577 new UnsyncByteArrayInputStream(bytes);
578
579 ObjectInputStream objectInputStream = null;
580
581 try {
582 objectInputStream = new ObjectInputStream(byteArrayInputStream);
583
584 Object object = objectInputStream.readObject();
585
586 return object;
587 }
588 catch (Exception e) {
589 throw new SchedulerException(
590 "Unable to deserialize object from " + string, e);
591 }
592 finally {
593 try {
594 objectInputStream.close();
595 }
596 catch (Exception e) {
597 }
598 }
599 }
600
601 protected String getFullName(String jobName, String groupName) {
602 return groupName.concat(StringPool.PERIOD).concat(jobName);
603 }
604
605 protected String getMasterAddressString(boolean asynchronous)
606 throws SchedulerException {
607
608 String owner = null;
609
610 Lock lock = null;
611
612 while (true) {
613 try {
614 if (owner == null) {
615 lock = LockLocalServiceUtil.lock(
616 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
617 _localClusterNodeAddress);
618 }
619 else {
620 lock = LockLocalServiceUtil.lock(
621 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
622 _localClusterNodeAddress);
623 }
624
625 Address address = (Address)getDeserializedObject(
626 lock.getOwner());
627
628 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
629 break;
630 }
631 else {
632 owner = lock.getOwner();
633 }
634 }
635 catch (Exception e) {
636 if (_log.isWarnEnabled()) {
637 _log.warn(
638 "Unable to obtain memory scheduler cluster lock. " +
639 "Trying again.");
640 }
641 }
642 }
643
644 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
645
646 if (master == _master) {
647 return lock.getOwner();
648 }
649
650 if (master) {
651 slaveToMaster();
652 }
653 else {
654 masterToSlave(lock.getOwner(), asynchronous);
655 }
656
657 return lock.getOwner();
658 }
659
660 protected String getSerializedString(Object object) throws Exception {
661 UnsyncByteArrayOutputStream byteArrayOutputStream =
662 new UnsyncByteArrayOutputStream();
663
664 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
665 byteArrayOutputStream);
666
667 objectOutputStream.writeObject(object);
668 objectOutputStream.close();
669
670 byte[] bytes = byteArrayOutputStream.toByteArray();
671
672 return Base64.encode(bytes);
673 }
674
675 protected void initMemoryClusteredJobs(
676 List<SchedulerResponse> schedulerResponses)
677 throws Exception {
678
679 for (SchedulerResponse schedulerResponse : schedulerResponses) {
680 Trigger oldTrigger = schedulerResponse.getTrigger();
681
682 String jobName = schedulerResponse.getJobName();
683 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
684 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
685
686 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
687 oldTrigger.getTriggerType(), jobName, groupName,
688 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
689 oldTrigger.getTriggerContent());
690
691 schedulerResponse.setTrigger(newTrigger);
692
693 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
694 schedulerResponse);
695
696 Message message = schedulerResponse.getMessage();
697
698 message.remove(JOB_STATE);
699
700 _memoryClusteredJobs.put(
701 getFullName(jobName, groupName),
702 new ObjectValuePair<SchedulerResponse, TriggerState>(
703 schedulerResponse, triggerState));
704 }
705 }
706
707 protected boolean isMemoryClusteredSlaveJob(String groupName)
708 throws SchedulerException {
709
710 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
711 groupName);
712
713 StorageType storageType = objectValuePair.getValue();
714
715 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
716 return false;
717 }
718
719 String masterAddressString = getMasterAddressString(false);
720
721 if (_localClusterNodeAddress.equals(masterAddressString)) {
722 return false;
723 }
724
725 return true;
726 }
727
728 protected void masterToSlave(
729 String masterAddressString, boolean asynchronous)
730 throws SchedulerException {
731
732 if (asynchronous) {
733 MethodHandler methodHandler = new MethodHandler(
734 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
735
736 Address address = (Address)getDeserializedObject(
737 masterAddressString);
738
739 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
740 methodHandler, address);
741
742 try {
743 ClusterExecutorUtil.execute(
744 clusterRequest,
745 new MemorySchedulerClusterResponseCallback(address), 20,
746 TimeUnit.SECONDS);
747
748 return;
749 }
750 catch (Exception e) {
751 throw new SchedulerException(
752 "Unable to load scheduled jobs from cluster node " +
753 address.getDescription(),
754 e);
755 }
756 }
757
758 List<SchedulerResponse> schedulerResponses = callMaster(
759 masterAddressString, _getScheduledJobsMethodKey3,
760 StorageType.MEMORY_CLUSTERED);
761
762 _doMasterToSlave(schedulerResponses);
763 }
764
765 protected void removeMemoryClusteredJobs(String groupName) {
766 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
767 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
768
769 Iterator
770 <Map.Entry<String,
771 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
772 memoryClusteredJobs.iterator();
773
774 while (itr.hasNext()) {
775 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
776 entry = itr.next();
777
778 ObjectValuePair<SchedulerResponse, TriggerState>
779 memoryClusteredJob = entry.getValue();
780
781 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
782
783 if (groupName.equals(schedulerResponse.getGroupName())) {
784 itr.remove();
785 }
786 }
787 }
788
789 protected ObjectValuePair<String, StorageType> resolveGroupName(
790 String groupName) {
791
792 int index = groupName.indexOf(CharPool.POUND);
793
794 String storageTypeString = groupName.substring(0, index);
795
796 StorageType storageType = StorageType.valueOf(storageTypeString);
797
798 String orginalGroupName = groupName.substring(index + 1);
799
800 return new ObjectValuePair<String, StorageType>(
801 orginalGroupName, storageType);
802 }
803
804 protected void setClusterableThreadLocal(String groupName) {
805 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
806 groupName);
807
808 ClusterableContextThreadLocal.putThreadLocalContext(
809 STORAGE_TYPE, objectValuePair.getValue());
810 ClusterableContextThreadLocal.putThreadLocalContext(
811 _PORTAL_READY, _portalReady);
812
813 boolean pluginReady = true;
814
815 if (PluginContextLifecycleThreadLocal.isInitializing() ||
816 PluginContextLifecycleThreadLocal.isDestroying()) {
817
818 pluginReady = false;
819 }
820
821 ClusterableContextThreadLocal.putThreadLocalContext(
822 _PLUGIN_READY, pluginReady);
823 }
824
825 protected void slaveToMaster() throws SchedulerException {
826 boolean forceSync = ProxyModeThreadLocal.isForceSync();
827
828 ProxyModeThreadLocal.setForceSync(true);
829
830 _writeLock.lock();
831
832 try {
833 for (ObjectValuePair<SchedulerResponse, TriggerState>
834 memoryClusteredJob : _memoryClusteredJobs.values()) {
835
836 SchedulerResponse schedulerResponse =
837 memoryClusteredJob.getKey();
838
839 _schedulerEngine.schedule(
840 schedulerResponse.getTrigger(),
841 schedulerResponse.getDescription(),
842 schedulerResponse.getDestinationName(),
843 schedulerResponse.getMessage());
844
845 TriggerState triggerState = memoryClusteredJob.getValue();
846
847 if (triggerState.equals(TriggerState.PAUSED)) {
848 _schedulerEngine.pause(
849 schedulerResponse.getJobName(),
850 schedulerResponse.getGroupName());
851 }
852 }
853
854 _memoryClusteredJobs.clear();
855 }
856 finally {
857 ProxyModeThreadLocal.setForceSync(forceSync);
858
859 _master = true;
860
861 _writeLock.unlock();
862 }
863 }
864
865 protected void updateMemoryClusteredJob(
866 String jobName, String groupName, TriggerState triggerState) {
867
868 ObjectValuePair<SchedulerResponse, TriggerState>
869 memoryClusteredJob = _memoryClusteredJobs.get(
870 getFullName(jobName, groupName));
871
872 if (memoryClusteredJob != null) {
873 memoryClusteredJob.setValue(triggerState);
874 }
875 }
876
877 protected void updateMemoryClusteredJobs(
878 String groupName, TriggerState triggerState) {
879
880 for (ObjectValuePair<SchedulerResponse, TriggerState>
881 memoryClusteredJob : _memoryClusteredJobs.values()) {
882
883 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
884
885 if (groupName.equals(schedulerResponse.getGroupName())) {
886 memoryClusteredJob.setValue(triggerState);
887 }
888 }
889 }
890
891 @BeanReference(
892 name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
893 protected SchedulerEngine schedulerEngine;
894
895 private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
896 throws SchedulerException {
897
898 _writeLock.lock();
899
900 try {
901 for (SchedulerResponse schedulerResponse :
902 _schedulerEngine.getScheduledJobs()) {
903
904 if (StorageType.MEMORY_CLUSTERED ==
905 schedulerResponse.getStorageType()) {
906
907 String groupName = StorageType.MEMORY_CLUSTERED.toString();
908
909 groupName = groupName.concat(StringPool.POUND).concat(
910 schedulerResponse.getGroupName());
911
912 _schedulerEngine.delete(
913 schedulerResponse.getJobName(), groupName);
914 }
915 }
916
917 initMemoryClusteredJobs(schedulerResponses);
918
919 if (_log.isInfoEnabled()) {
920 _log.info("Switched current node from master to slave");
921 }
922 }
923 catch (Exception e) {
924 throw new SchedulerException(e);
925 }
926 finally {
927 _master = false;
928
929 _writeLock.unlock();
930 }
931 }
932
933 private static final String _LOCK_CLASS_NAME =
934 SchedulerEngine.class.getName();
935
936 private static final String _PLUGIN_READY = "plugin.ready";
937
938 private static final String _PORTAL_READY = "portal.ready";
939
940 private static Log _log = LogFactoryUtil.getLog(
941 ClusterSchedulerEngine.class);
942
943 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
944 SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
945 String.class, StorageType.class);
946 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
947 SchedulerEngineHelperUtil.class, "getScheduledJobs");
948 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
949 SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
950 StorageType.class);
951 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
952 SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
953
954 private String _beanIdentifier;
955 private ClusterEventListener _clusterEventListener;
956 private volatile String _localClusterNodeAddress;
957 private volatile boolean _master;
958 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
959 _memoryClusteredJobs = new ConcurrentHashMap
960 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
961 private boolean _portalReady;
962 private java.util.concurrent.locks.Lock _readLock;
963 private SchedulerEngine _schedulerEngine;
964 private java.util.concurrent.locks.Lock _writeLock;
965
966 private static class SchedulerClusterInvokeAcceptor
967 implements ClusterInvokeAcceptor {
968
969 @Override
970 public boolean accept(Map<String, Serializable> context) {
971 if (ClusterInvokeThreadLocal.isEnabled()) {
972 return true;
973 }
974
975 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
976 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
977 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
978
979 if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
980 !pluginReady) {
981
982 return false;
983 }
984
985 return true;
986 }
987
988 }
989
990 private class MemorySchedulerClusterEventListener
991 implements ClusterEventListener {
992
993 @Override
994 public void processClusterEvent(ClusterEvent clusterEvent) {
995 try {
996 getMasterAddressString(true);
997 }
998 catch (Exception e) {
999 _log.error("Unable to update memory scheduler cluster lock", e);
1000 }
1001 }
1002
1003 }
1004
1005 private class MemorySchedulerClusterResponseCallback
1006 extends BaseClusterResponseCallback {
1007
1008 public MemorySchedulerClusterResponseCallback(Address address) {
1009 _address = address;
1010 }
1011
1012 @Override
1013 public void callback(ClusterNodeResponses clusterNodeResponses) {
1014 try {
1015 ClusterNodeResponse clusterNodeResponse =
1016 clusterNodeResponses.getClusterResponse(_address);
1017
1018 List<SchedulerResponse> schedulerResponses =
1019 (List<SchedulerResponse>)clusterNodeResponse.getResult();
1020
1021 _doMasterToSlave(schedulerResponses);
1022 }
1023 catch (Exception e) {
1024 _log.error(
1025 "Unable to load memory clustered jobs from cluster node " +
1026 _address.getDescription(),
1027 e);
1028 }
1029 }
1030
1031 @Override
1032 public void processTimeoutException(TimeoutException timeoutException) {
1033 _log.error(
1034 "Unable to load memory clustered jobs from cluster node " +
1035 _address.getDescription(),
1036 timeoutException);
1037 }
1038
1039 private Address _address;
1040
1041 }
1042
1043 }