001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018 import com.liferay.portal.kernel.bean.BeanReference;
019 import com.liferay.portal.kernel.bean.IdentifiableBean;
020 import com.liferay.portal.kernel.cluster.BaseClusterMasterTokenTransitionListener;
021 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
022 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
023 import com.liferay.portal.kernel.cluster.ClusterMasterExecutorUtil;
024 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
025 import com.liferay.portal.kernel.cluster.Clusterable;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.messaging.Message;
029 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
030 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
031 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
032 import com.liferay.portal.kernel.scheduler.SchedulerException;
033 import com.liferay.portal.kernel.scheduler.StorageType;
034 import com.liferay.portal.kernel.scheduler.Trigger;
035 import com.liferay.portal.kernel.scheduler.TriggerState;
036 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
037 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
038 import com.liferay.portal.kernel.util.MethodHandler;
039 import com.liferay.portal.kernel.util.MethodKey;
040 import com.liferay.portal.kernel.util.ObjectValuePair;
041 import com.liferay.portal.kernel.util.StringPool;
042 import com.liferay.portal.util.PropsValues;
043
044 import java.io.Serializable;
045
046 import java.util.Iterator;
047 import java.util.List;
048 import java.util.Map;
049 import java.util.Set;
050 import java.util.concurrent.ConcurrentHashMap;
051 import java.util.concurrent.Future;
052 import java.util.concurrent.TimeUnit;
053 import java.util.concurrent.locks.ReadWriteLock;
054 import java.util.concurrent.locks.ReentrantReadWriteLock;
055
056
059 public class ClusterSchedulerEngine
060 implements IdentifiableBean, SchedulerEngine {
061
062 public static SchedulerEngine createClusterSchedulerEngine(
063 SchedulerEngine schedulerEngine) {
064
065 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
066 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
067 }
068
069 return schedulerEngine;
070 }
071
072 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
073 _schedulerEngine = schedulerEngine;
074
075 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
076
077 _readLock = readWriteLock.readLock();
078 _writeLock = readWriteLock.writeLock();
079 }
080
081 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
082 @Override
083 public void delete(String groupName, StorageType storageType)
084 throws SchedulerException {
085
086 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
087 storageType);
088
089 _readLock.lock();
090
091 try {
092 if (memoryClusteredSlaveJob) {
093 removeMemoryClusteredJobs(groupName);
094 }
095 else {
096 _schedulerEngine.delete(groupName, storageType);
097 }
098 }
099 finally {
100 _readLock.unlock();
101 }
102
103 setClusterableThreadLocal(storageType);
104 }
105
106 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
107 @Override
108 public void delete(
109 String jobName, String groupName, StorageType storageType)
110 throws SchedulerException {
111
112 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
113 storageType);
114
115 _readLock.lock();
116
117 try {
118 if (memoryClusteredSlaveJob) {
119 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
120 }
121 else {
122 _schedulerEngine.delete(jobName, groupName, storageType);
123 }
124 }
125 finally {
126 _readLock.unlock();
127 }
128
129 setClusterableThreadLocal(storageType);
130 }
131
132 @Override
133 public String getBeanIdentifier() {
134 return _beanIdentifier;
135 }
136
137 @Clusterable(onMaster = true)
138 @Override
139 public SchedulerResponse getScheduledJob(
140 String jobName, String groupName, StorageType storageType)
141 throws SchedulerException {
142
143 _readLock.lock();
144
145 try {
146 return _schedulerEngine.getScheduledJob(
147 jobName, groupName, storageType);
148 }
149 finally {
150 _readLock.unlock();
151 }
152 }
153
154 @Clusterable(onMaster = true)
155 @Override
156 public List<SchedulerResponse> getScheduledJobs()
157 throws SchedulerException {
158
159 _readLock.lock();
160
161 try {
162 return _schedulerEngine.getScheduledJobs();
163 }
164 finally {
165 _readLock.unlock();
166 }
167 }
168
169 @Clusterable(onMaster = true)
170 @Override
171 public List<SchedulerResponse> getScheduledJobs(StorageType storageType)
172 throws SchedulerException {
173
174 _readLock.lock();
175
176 try {
177 return _schedulerEngine.getScheduledJobs(storageType);
178 }
179 finally {
180 _readLock.unlock();
181 }
182 }
183
184 @Clusterable(onMaster = true)
185 @Override
186 public List<SchedulerResponse> getScheduledJobs(
187 String groupName, StorageType storageType)
188 throws SchedulerException {
189
190 _readLock.lock();
191
192 try {
193 return _schedulerEngine.getScheduledJobs(groupName, storageType);
194 }
195 finally {
196 _readLock.unlock();
197 }
198 }
199
200 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
201 @Override
202 public void pause(String groupName, StorageType storageType)
203 throws SchedulerException {
204
205 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
206 storageType);
207
208 _readLock.lock();
209
210 try {
211 if (memoryClusteredSlaveJob) {
212 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
213 }
214 else {
215 _schedulerEngine.pause(groupName, storageType);
216 }
217 }
218 finally {
219 _readLock.unlock();
220 }
221
222 setClusterableThreadLocal(storageType);
223 }
224
225 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
226 @Override
227 public void pause(String jobName, String groupName, StorageType storageType)
228 throws SchedulerException {
229
230 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
231 storageType);
232
233 _readLock.lock();
234
235 try {
236 if (memoryClusteredSlaveJob) {
237 updateMemoryClusteredJob(
238 jobName, groupName, TriggerState.PAUSED);
239 }
240 else {
241 _schedulerEngine.pause(jobName, groupName, storageType);
242 }
243 }
244 finally {
245 _readLock.unlock();
246 }
247
248 setClusterableThreadLocal(storageType);
249 }
250
251 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
252 @Override
253 public void resume(String groupName, StorageType storageType)
254 throws SchedulerException {
255
256 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
257 storageType);
258
259 _readLock.lock();
260
261 try {
262 if (memoryClusteredSlaveJob) {
263 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
264 }
265 else {
266 _schedulerEngine.resume(groupName, storageType);
267 }
268 }
269 finally {
270 _readLock.unlock();
271 }
272
273 setClusterableThreadLocal(storageType);
274 }
275
276 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
277 @Override
278 public void resume(
279 String jobName, String groupName, StorageType storageType)
280 throws SchedulerException {
281
282 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
283 storageType);
284
285 _readLock.lock();
286
287 try {
288 if (memoryClusteredSlaveJob) {
289 updateMemoryClusteredJob(
290 jobName, groupName, TriggerState.NORMAL);
291 }
292 else {
293 _schedulerEngine.resume(jobName, groupName, storageType);
294 }
295 }
296 finally {
297 _readLock.unlock();
298 }
299
300 setClusterableThreadLocal(storageType);
301 }
302
303 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
304 @Override
305 public void schedule(
306 Trigger trigger, String description, String destinationName,
307 Message message, StorageType storageType)
308 throws SchedulerException {
309
310 String groupName = trigger.getGroupName();
311 String jobName = trigger.getJobName();
312
313 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
314 storageType);
315
316 _readLock.lock();
317
318 try {
319 if (memoryClusteredSlaveJob) {
320 SchedulerResponse schedulerResponse = new SchedulerResponse();
321
322 schedulerResponse.setDescription(description);
323 schedulerResponse.setDestinationName(destinationName);
324 schedulerResponse.setGroupName(groupName);
325 schedulerResponse.setJobName(jobName);
326 schedulerResponse.setMessage(message);
327 schedulerResponse.setTrigger(trigger);
328 schedulerResponse.setStorageType(storageType);
329
330 _memoryClusteredJobs.put(
331 getFullName(jobName, groupName),
332 new ObjectValuePair<SchedulerResponse, TriggerState>(
333 schedulerResponse, TriggerState.NORMAL));
334 }
335 else {
336 _schedulerEngine.schedule(
337 trigger, description, destinationName, message,
338 storageType);
339 }
340 }
341 finally {
342 _readLock.unlock();
343 }
344
345 setClusterableThreadLocal(storageType);
346 }
347
348 @Override
349 public void setBeanIdentifier(String beanIdentifier) {
350 _beanIdentifier = beanIdentifier;
351 }
352
353 @Override
354 public void shutdown() throws SchedulerException {
355 _portalReady = false;
356
357 ClusterMasterExecutorUtil.
358 unregisterClusterMasterTokenTransitionListener(
359 _schedulerClusterMasterTokenTransitionListener);
360
361 _schedulerEngine.shutdown();
362 }
363
364 @Override
365 public void start() throws SchedulerException {
366 try {
367 if (!ClusterMasterExecutorUtil.isMaster()) {
368 initMemoryClusteredJobs();
369 }
370
371 _schedulerClusterMasterTokenTransitionListener =
372 new SchedulerClusterMasterTokenTransitionListener();
373
374 ClusterMasterExecutorUtil.
375 registerClusterMasterTokenTransitionListener(
376 _schedulerClusterMasterTokenTransitionListener);
377 }
378 catch (Exception e) {
379 throw new SchedulerException("Unable to initialize scheduler", e);
380 }
381
382 _schedulerEngine.start();
383
384 _portalReady = true;
385 }
386
387 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
388 @Override
389 public void suppressError(
390 String jobName, String groupName, StorageType storageType)
391 throws SchedulerException {
392
393 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
394 storageType);
395
396 if (!memoryClusteredSlaveJob) {
397 _readLock.lock();
398
399 try {
400 _schedulerEngine.suppressError(jobName, groupName, storageType);
401 }
402 finally {
403 _readLock.unlock();
404 }
405 }
406
407 setClusterableThreadLocal(storageType);
408 }
409
410 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
411 @Override
412 public void unschedule(String groupName, StorageType storageType)
413 throws SchedulerException {
414
415 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
416 storageType);
417
418 _readLock.lock();
419
420 try {
421 if (memoryClusteredSlaveJob) {
422 removeMemoryClusteredJobs(groupName);
423 }
424 else {
425 _schedulerEngine.unschedule(groupName, storageType);
426 }
427 }
428 finally {
429 _readLock.unlock();
430 }
431
432 setClusterableThreadLocal(storageType);
433 }
434
435 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
436 @Override
437 public void unschedule(
438 String jobName, String groupName, StorageType storageType)
439 throws SchedulerException {
440
441 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
442 storageType);
443
444 _readLock.lock();
445
446 try {
447 if (memoryClusteredSlaveJob) {
448 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
449 }
450 else {
451 _schedulerEngine.unschedule(jobName, groupName, storageType);
452 }
453 }
454 finally {
455 _readLock.unlock();
456 }
457
458 setClusterableThreadLocal(storageType);
459 }
460
461 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
462 @Override
463 public void update(Trigger trigger, StorageType storageType)
464 throws SchedulerException {
465
466 String jobName = trigger.getJobName();
467 String groupName = trigger.getGroupName();
468
469 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
470 storageType);
471
472 _readLock.lock();
473
474 try {
475 if (memoryClusteredSlaveJob) {
476 boolean updated = false;
477
478 for (ObjectValuePair<SchedulerResponse, TriggerState>
479 memoryClusteredJob : _memoryClusteredJobs.values()) {
480
481 SchedulerResponse schedulerResponse =
482 memoryClusteredJob.getKey();
483
484 if (jobName.equals(schedulerResponse.getJobName()) &&
485 groupName.equals(schedulerResponse.getGroupName())) {
486
487 schedulerResponse.setTrigger(trigger);
488
489 updated = true;
490
491 break;
492 }
493 }
494
495 if (!updated) {
496 throw new SchedulerException(
497 "Unable to update trigger for memory clustered job");
498 }
499 }
500 else {
501 _schedulerEngine.update(trigger, storageType);
502 }
503 }
504 finally {
505 _readLock.unlock();
506 }
507
508 setClusterableThreadLocal(storageType);
509 }
510
511 protected String getFullName(String jobName, String groupName) {
512 return groupName.concat(StringPool.PERIOD).concat(jobName);
513 }
514
515 protected void initMemoryClusteredJobs() throws Exception {
516 MethodHandler methodHandler = new MethodHandler(
517 _getScheduledJobsMethodKey, StorageType.MEMORY_CLUSTERED);
518
519 Future<List<SchedulerResponse>> future =
520 ClusterMasterExecutorUtil.executeOnMaster(methodHandler);
521
522 List<SchedulerResponse> schedulerResponses = future.get(
523 PropsValues.CLUSTERABLE_ADVICE_CALL_MASTER_TIMEOUT,
524 TimeUnit.SECONDS);
525
526 for (SchedulerResponse schedulerResponse : schedulerResponses) {
527 String jobName = schedulerResponse.getJobName();
528 String groupName = schedulerResponse.getGroupName();
529
530 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
531 schedulerResponse);
532
533 Message message = schedulerResponse.getMessage();
534
535 message.remove(JOB_STATE);
536
537 _memoryClusteredJobs.put(
538 getFullName(jobName, groupName),
539 new ObjectValuePair<SchedulerResponse, TriggerState>(
540 schedulerResponse, triggerState));
541 }
542 }
543
544 protected boolean isMemoryClusteredSlaveJob(StorageType storageType) {
545 if ((storageType != StorageType.MEMORY_CLUSTERED) ||
546 ClusterMasterExecutorUtil.isMaster()) {
547
548 return false;
549 }
550
551 return true;
552 }
553
554 protected void removeMemoryClusteredJobs(String groupName) {
555 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
556 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
557
558 Iterator
559 <Map.Entry<String,
560 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
561 memoryClusteredJobs.iterator();
562
563 while (itr.hasNext()) {
564 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
565 entry = itr.next();
566
567 ObjectValuePair<SchedulerResponse, TriggerState>
568 memoryClusteredJob = entry.getValue();
569
570 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
571
572 if (groupName.equals(schedulerResponse.getGroupName())) {
573 itr.remove();
574 }
575 }
576 }
577
578 protected void setClusterableThreadLocal(StorageType storageType) {
579 ClusterableContextThreadLocal.putThreadLocalContext(
580 STORAGE_TYPE, storageType);
581 ClusterableContextThreadLocal.putThreadLocalContext(
582 _PORTAL_READY, _portalReady);
583
584 boolean pluginReady = true;
585
586 if (PluginContextLifecycleThreadLocal.isInitializing() ||
587 PluginContextLifecycleThreadLocal.isDestroying()) {
588
589 pluginReady = false;
590 }
591
592 ClusterableContextThreadLocal.putThreadLocalContext(
593 _PLUGIN_READY, pluginReady);
594 }
595
596 protected void updateMemoryClusteredJob(
597 String jobName, String groupName, TriggerState triggerState) {
598
599 ObjectValuePair<SchedulerResponse, TriggerState>
600 memoryClusteredJob = _memoryClusteredJobs.get(
601 getFullName(jobName, groupName));
602
603 if (memoryClusteredJob != null) {
604 memoryClusteredJob.setValue(triggerState);
605 }
606 }
607
608 protected void updateMemoryClusteredJobs(
609 String groupName, TriggerState triggerState) {
610
611 for (ObjectValuePair<SchedulerResponse, TriggerState>
612 memoryClusteredJob : _memoryClusteredJobs.values()) {
613
614 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
615
616 if (groupName.equals(schedulerResponse.getGroupName())) {
617 memoryClusteredJob.setValue(triggerState);
618 }
619 }
620 }
621
622 @BeanReference(
623 name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
624 protected SchedulerEngine schedulerEngine;
625
626 private static final String _PLUGIN_READY = "plugin.ready";
627
628 private static final String _PORTAL_READY = "portal.ready";
629
630 private static final Log _log = LogFactoryUtil.getLog(
631 ClusterSchedulerEngine.class);
632
633 private static final MethodKey _getScheduledJobsMethodKey = new MethodKey(
634 SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
635
636 private String _beanIdentifier;
637 private final Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
638 _memoryClusteredJobs = new ConcurrentHashMap
639 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
640 private boolean _portalReady;
641 private final java.util.concurrent.locks.Lock _readLock;
642 private ClusterMasterTokenTransitionListener
643 _schedulerClusterMasterTokenTransitionListener;
644 private final SchedulerEngine _schedulerEngine;
645 private final java.util.concurrent.locks.Lock _writeLock;
646
647 private static class SchedulerClusterInvokeAcceptor
648 implements ClusterInvokeAcceptor {
649
650 @Override
651 public boolean accept(Map<String, Serializable> context) {
652 if (!ClusterInvokeThreadLocal.isEnabled()) {
653 return false;
654 }
655
656 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
657 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
658 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
659
660 if ((storageType == StorageType.PERSISTED) || !portalReady ||
661 !pluginReady) {
662
663 return false;
664 }
665
666 return true;
667 }
668
669 }
670
671 private class SchedulerClusterMasterTokenTransitionListener
672 extends BaseClusterMasterTokenTransitionListener {
673
674 @Override
675 protected void doMasterTokenAcquired() throws Exception {
676 boolean forceSync = ProxyModeThreadLocal.isForceSync();
677
678 ProxyModeThreadLocal.setForceSync(true);
679
680 _writeLock.lock();
681
682 try {
683 for (ObjectValuePair<SchedulerResponse, TriggerState>
684 memoryClusteredJob : _memoryClusteredJobs.values()) {
685
686 SchedulerResponse schedulerResponse =
687 memoryClusteredJob.getKey();
688
689 _schedulerEngine.schedule(
690 schedulerResponse.getTrigger(),
691 schedulerResponse.getDescription(),
692 schedulerResponse.getDestinationName(),
693 schedulerResponse.getMessage(),
694 schedulerResponse.getStorageType());
695
696 TriggerState triggerState = memoryClusteredJob.getValue();
697
698 if (triggerState.equals(TriggerState.PAUSED)) {
699 _schedulerEngine.pause(
700 schedulerResponse.getJobName(),
701 schedulerResponse.getGroupName(),
702 schedulerResponse.getStorageType());
703 }
704 }
705
706 _memoryClusteredJobs.clear();
707
708 if (_log.isInfoEnabled()) {
709 _log.info("MEMORY_CLUSTERED jobs are running on this node");
710 }
711 }
712 finally {
713 ProxyModeThreadLocal.setForceSync(forceSync);
714
715 _writeLock.unlock();
716 }
717 }
718
719 @Override
720 protected void doMasterTokenReleased() throws Exception {
721 _writeLock.lock();
722
723 try {
724 for (SchedulerResponse schedulerResponse :
725 _schedulerEngine.getScheduledJobs()) {
726
727 if (StorageType.MEMORY_CLUSTERED ==
728 schedulerResponse.getStorageType()) {
729
730 _schedulerEngine.delete(
731 schedulerResponse.getJobName(),
732 schedulerResponse.getGroupName(),
733 schedulerResponse.getStorageType());
734 }
735 }
736
737 initMemoryClusteredJobs();
738
739 if (_log.isInfoEnabled()) {
740 _log.info(
741 "MEMORY_CLUSTERED jobs stopped running on this node");
742 }
743 }
744 finally {
745 _writeLock.unlock();
746 }
747 }
748
749 }
750
751 }