001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018 import com.liferay.portal.kernel.bean.BeanReference;
019 import com.liferay.portal.kernel.bean.IdentifiableBean;
020 import com.liferay.portal.kernel.cluster.BaseClusterMasterTokenTransitionListener;
021 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
022 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
023 import com.liferay.portal.kernel.cluster.ClusterMasterExecutorUtil;
024 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
025 import com.liferay.portal.kernel.cluster.Clusterable;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.messaging.Message;
029 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
030 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
031 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
032 import com.liferay.portal.kernel.scheduler.SchedulerException;
033 import com.liferay.portal.kernel.scheduler.StorageType;
034 import com.liferay.portal.kernel.scheduler.Trigger;
035 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
036 import com.liferay.portal.kernel.scheduler.TriggerState;
037 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
038 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
039 import com.liferay.portal.kernel.util.CharPool;
040 import com.liferay.portal.kernel.util.MethodHandler;
041 import com.liferay.portal.kernel.util.MethodKey;
042 import com.liferay.portal.kernel.util.ObjectValuePair;
043 import com.liferay.portal.kernel.util.StringPool;
044 import com.liferay.portal.util.PropsValues;
045
046 import java.io.Serializable;
047
048 import java.util.Iterator;
049 import java.util.List;
050 import java.util.Map;
051 import java.util.Set;
052 import java.util.concurrent.ConcurrentHashMap;
053 import java.util.concurrent.Future;
054 import java.util.concurrent.TimeUnit;
055 import java.util.concurrent.locks.ReadWriteLock;
056 import java.util.concurrent.locks.ReentrantReadWriteLock;
057
058
061 public class ClusterSchedulerEngine
062 implements IdentifiableBean, SchedulerEngine {
063
064 public static SchedulerEngine createClusterSchedulerEngine(
065 SchedulerEngine schedulerEngine) {
066
067 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
068 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
069 }
070
071 return schedulerEngine;
072 }
073
074 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
075 _schedulerEngine = schedulerEngine;
076
077 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
078
079 _readLock = readWriteLock.readLock();
080 _writeLock = readWriteLock.writeLock();
081 }
082
083 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
084 @Override
085 public void delete(String groupName) throws SchedulerException {
086 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
087
088 _readLock.lock();
089
090 try {
091 if (memoryClusteredSlaveJob) {
092 removeMemoryClusteredJobs(groupName);
093 }
094 else {
095 _schedulerEngine.delete(groupName);
096 }
097 }
098 finally {
099 _readLock.unlock();
100 }
101
102 setClusterableThreadLocal(groupName);
103 }
104
105 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
106 @Override
107 public void delete(String jobName, String groupName)
108 throws SchedulerException {
109
110 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
111
112 _readLock.lock();
113
114 try {
115 if (memoryClusteredSlaveJob) {
116 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
117 }
118 else {
119 _schedulerEngine.delete(jobName, groupName);
120 }
121 }
122 finally {
123 _readLock.unlock();
124 }
125
126 setClusterableThreadLocal(groupName);
127 }
128
129 @Override
130 public String getBeanIdentifier() {
131 return _beanIdentifier;
132 }
133
134 @Clusterable(onMaster = true)
135 @Override
136 public SchedulerResponse getScheduledJob(String jobName, String groupName)
137 throws SchedulerException {
138
139 _readLock.lock();
140
141 try {
142 return _schedulerEngine.getScheduledJob(jobName, groupName);
143 }
144 finally {
145 _readLock.unlock();
146 }
147 }
148
149 @Clusterable(onMaster = true)
150 @Override
151 public List<SchedulerResponse> getScheduledJobs()
152 throws SchedulerException {
153
154 _readLock.lock();
155
156 try {
157 return _schedulerEngine.getScheduledJobs();
158 }
159 finally {
160 _readLock.unlock();
161 }
162 }
163
164 @Clusterable(onMaster = true)
165 @Override
166 public List<SchedulerResponse> getScheduledJobs(String groupName)
167 throws SchedulerException {
168
169 _readLock.lock();
170
171 try {
172 return _schedulerEngine.getScheduledJobs(groupName);
173 }
174 finally {
175 _readLock.unlock();
176 }
177 }
178
179 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
180 @Override
181 public void pause(String groupName) throws SchedulerException {
182 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
183
184 _readLock.lock();
185
186 try {
187 if (memoryClusteredSlaveJob) {
188 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
189 }
190 else {
191 _schedulerEngine.pause(groupName);
192 }
193 }
194 finally {
195 _readLock.unlock();
196 }
197
198 setClusterableThreadLocal(groupName);
199 }
200
201 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
202 @Override
203 public void pause(String jobName, String groupName)
204 throws SchedulerException {
205
206 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
207
208 _readLock.lock();
209
210 try {
211 if (memoryClusteredSlaveJob) {
212 updateMemoryClusteredJob(
213 jobName, groupName, TriggerState.PAUSED);
214 }
215 else {
216 _schedulerEngine.pause(jobName, groupName);
217 }
218 }
219 finally {
220 _readLock.unlock();
221 }
222
223 setClusterableThreadLocal(groupName);
224 }
225
226 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
227 @Override
228 public void resume(String groupName) throws SchedulerException {
229 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
230
231 _readLock.lock();
232
233 try {
234 if (memoryClusteredSlaveJob) {
235 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
236 }
237 else {
238 _schedulerEngine.resume(groupName);
239 }
240 }
241 finally {
242 _readLock.unlock();
243 }
244
245 setClusterableThreadLocal(groupName);
246 }
247
248 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
249 @Override
250 public void resume(String jobName, String groupName)
251 throws SchedulerException {
252
253 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
254
255 _readLock.lock();
256
257 try {
258 if (memoryClusteredSlaveJob) {
259 updateMemoryClusteredJob(
260 jobName, groupName, TriggerState.NORMAL);
261 }
262 else {
263 _schedulerEngine.resume(jobName, groupName);
264 }
265 }
266 finally {
267 _readLock.unlock();
268 }
269
270 setClusterableThreadLocal(groupName);
271 }
272
273 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
274 @Override
275 public void schedule(
276 Trigger trigger, String description, String destinationName,
277 Message message)
278 throws SchedulerException {
279
280 String groupName = trigger.getGroupName();
281 String jobName = trigger.getJobName();
282
283 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
284
285 _readLock.lock();
286
287 try {
288 if (memoryClusteredSlaveJob) {
289 SchedulerResponse schedulerResponse = new SchedulerResponse();
290
291 schedulerResponse.setDescription(description);
292 schedulerResponse.setDestinationName(destinationName);
293 schedulerResponse.setGroupName(groupName);
294 schedulerResponse.setJobName(jobName);
295 schedulerResponse.setMessage(message);
296 schedulerResponse.setTrigger(trigger);
297
298 _memoryClusteredJobs.put(
299 getFullName(jobName, groupName),
300 new ObjectValuePair<SchedulerResponse, TriggerState>(
301 schedulerResponse, TriggerState.NORMAL));
302 }
303 else {
304 _schedulerEngine.schedule(
305 trigger, description, destinationName, message);
306 }
307 }
308 finally {
309 _readLock.unlock();
310 }
311
312 setClusterableThreadLocal(groupName);
313 }
314
315 @Override
316 public void setBeanIdentifier(String beanIdentifier) {
317 _beanIdentifier = beanIdentifier;
318 }
319
320 @Override
321 public void shutdown() throws SchedulerException {
322 _portalReady = false;
323
324 ClusterMasterExecutorUtil.
325 unregisterClusterMasterTokenTransitionListener(
326 _schedulerClusterMasterTokenTransitionListener);
327
328 _schedulerEngine.shutdown();
329 }
330
331 @Override
332 public void start() throws SchedulerException {
333 try {
334 if (!ClusterMasterExecutorUtil.isMaster()) {
335 initMemoryClusteredJobs();
336 }
337
338 _schedulerClusterMasterTokenTransitionListener =
339 new SchedulerClusterMasterTokenTransitionListener();
340
341 ClusterMasterExecutorUtil.
342 registerClusterMasterTokenTransitionListener(
343 _schedulerClusterMasterTokenTransitionListener);
344 }
345 catch (Exception e) {
346 throw new SchedulerException("Unable to initialize scheduler", e);
347 }
348
349 _schedulerEngine.start();
350
351 _portalReady = true;
352 }
353
354 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
355 @Override
356 public void suppressError(String jobName, String groupName)
357 throws SchedulerException {
358
359 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
360
361 if (!memoryClusteredSlaveJob) {
362 _readLock.lock();
363
364 try {
365 _schedulerEngine.suppressError(jobName, groupName);
366 }
367 finally {
368 _readLock.unlock();
369 }
370 }
371
372 setClusterableThreadLocal(groupName);
373 }
374
375 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
376 @Override
377 public void unschedule(String groupName) throws SchedulerException {
378 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
379
380 _readLock.lock();
381
382 try {
383 if (memoryClusteredSlaveJob) {
384 removeMemoryClusteredJobs(groupName);
385 }
386 else {
387 _schedulerEngine.unschedule(groupName);
388 }
389 }
390 finally {
391 _readLock.unlock();
392 }
393
394 setClusterableThreadLocal(groupName);
395 }
396
397 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
398 @Override
399 public void unschedule(String jobName, String groupName)
400 throws SchedulerException {
401
402 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
403
404 _readLock.lock();
405
406 try {
407 if (memoryClusteredSlaveJob) {
408 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
409 }
410 else {
411 _schedulerEngine.unschedule(jobName, groupName);
412 }
413 }
414 finally {
415 _readLock.unlock();
416 }
417
418 setClusterableThreadLocal(groupName);
419 }
420
421 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
422 @Override
423 public void update(Trigger trigger) throws SchedulerException {
424 String jobName = trigger.getJobName();
425 String groupName = trigger.getGroupName();
426
427 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
428
429 _readLock.lock();
430
431 try {
432 if (memoryClusteredSlaveJob) {
433 boolean updated = false;
434
435 for (ObjectValuePair<SchedulerResponse, TriggerState>
436 memoryClusteredJob : _memoryClusteredJobs.values()) {
437
438 SchedulerResponse schedulerResponse =
439 memoryClusteredJob.getKey();
440
441 if (jobName.equals(schedulerResponse.getJobName()) &&
442 groupName.equals(schedulerResponse.getGroupName())) {
443
444 schedulerResponse.setTrigger(trigger);
445
446 updated = true;
447
448 break;
449 }
450 }
451
452 if (!updated) {
453 throw new SchedulerException(
454 "Unable to update trigger for memory clustered job");
455 }
456 }
457 else {
458 _schedulerEngine.update(trigger);
459 }
460 }
461 finally {
462 _readLock.unlock();
463 }
464
465 setClusterableThreadLocal(groupName);
466 }
467
468 protected String getFullName(String jobName, String groupName) {
469 return groupName.concat(StringPool.PERIOD).concat(jobName);
470 }
471
472 protected void initMemoryClusteredJobs() throws Exception {
473 MethodHandler methodHandler = new MethodHandler(
474 _getScheduledJobsMethodKey, StorageType.MEMORY_CLUSTERED);
475
476 Future<List<SchedulerResponse>> future =
477 ClusterMasterExecutorUtil.executeOnMaster(methodHandler);
478
479 List<SchedulerResponse> schedulerResponses = future.get(
480 PropsValues.CLUSTERABLE_ADVICE_CALL_MASTER_TIMEOUT,
481 TimeUnit.SECONDS);
482
483 for (SchedulerResponse schedulerResponse : schedulerResponses) {
484 Trigger oldTrigger = schedulerResponse.getTrigger();
485
486 String jobName = schedulerResponse.getJobName();
487 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
488 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
489
490 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
491 oldTrigger.getTriggerType(), jobName, groupName,
492 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
493 oldTrigger.getTriggerContent());
494
495 schedulerResponse.setTrigger(newTrigger);
496
497 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
498 schedulerResponse);
499
500 Message message = schedulerResponse.getMessage();
501
502 message.remove(JOB_STATE);
503
504 _memoryClusteredJobs.put(
505 getFullName(jobName, groupName),
506 new ObjectValuePair<SchedulerResponse, TriggerState>(
507 schedulerResponse, triggerState));
508 }
509 }
510
511 protected boolean isMemoryClusteredSlaveJob(String groupName)
512 throws SchedulerException {
513
514 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
515 groupName);
516
517 StorageType storageType = objectValuePair.getValue();
518
519 if ((storageType != StorageType.MEMORY_CLUSTERED) ||
520 ClusterMasterExecutorUtil.isMaster()) {
521
522 return false;
523 }
524
525 return true;
526 }
527
528 protected void removeMemoryClusteredJobs(String groupName) {
529 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
530 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
531
532 Iterator
533 <Map.Entry<String,
534 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
535 memoryClusteredJobs.iterator();
536
537 while (itr.hasNext()) {
538 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
539 entry = itr.next();
540
541 ObjectValuePair<SchedulerResponse, TriggerState>
542 memoryClusteredJob = entry.getValue();
543
544 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
545
546 if (groupName.equals(schedulerResponse.getGroupName())) {
547 itr.remove();
548 }
549 }
550 }
551
552 protected ObjectValuePair<String, StorageType> resolveGroupName(
553 String groupName) {
554
555 int index = groupName.indexOf(CharPool.POUND);
556
557 String storageTypeString = groupName.substring(0, index);
558
559 StorageType storageType = StorageType.valueOf(storageTypeString);
560
561 String orginalGroupName = groupName.substring(index + 1);
562
563 return new ObjectValuePair<String, StorageType>(
564 orginalGroupName, storageType);
565 }
566
567 protected void setClusterableThreadLocal(String groupName) {
568 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
569 groupName);
570
571 ClusterableContextThreadLocal.putThreadLocalContext(
572 STORAGE_TYPE, objectValuePair.getValue());
573 ClusterableContextThreadLocal.putThreadLocalContext(
574 _PORTAL_READY, _portalReady);
575
576 boolean pluginReady = true;
577
578 if (PluginContextLifecycleThreadLocal.isInitializing() ||
579 PluginContextLifecycleThreadLocal.isDestroying()) {
580
581 pluginReady = false;
582 }
583
584 ClusterableContextThreadLocal.putThreadLocalContext(
585 _PLUGIN_READY, pluginReady);
586 }
587
588 protected void updateMemoryClusteredJob(
589 String jobName, String groupName, TriggerState triggerState) {
590
591 ObjectValuePair<SchedulerResponse, TriggerState>
592 memoryClusteredJob = _memoryClusteredJobs.get(
593 getFullName(jobName, groupName));
594
595 if (memoryClusteredJob != null) {
596 memoryClusteredJob.setValue(triggerState);
597 }
598 }
599
600 protected void updateMemoryClusteredJobs(
601 String groupName, TriggerState triggerState) {
602
603 for (ObjectValuePair<SchedulerResponse, TriggerState>
604 memoryClusteredJob : _memoryClusteredJobs.values()) {
605
606 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
607
608 if (groupName.equals(schedulerResponse.getGroupName())) {
609 memoryClusteredJob.setValue(triggerState);
610 }
611 }
612 }
613
614 @BeanReference(
615 name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
616 protected SchedulerEngine schedulerEngine;
617
618 private static final String _PLUGIN_READY = "plugin.ready";
619
620 private static final String _PORTAL_READY = "portal.ready";
621
622 private static Log _log = LogFactoryUtil.getLog(
623 ClusterSchedulerEngine.class);
624
625 private static final MethodKey _getScheduledJobsMethodKey = new MethodKey(
626 SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
627
628 private String _beanIdentifier;
629 private final Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
630 _memoryClusteredJobs = new ConcurrentHashMap
631 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
632 private boolean _portalReady;
633 private final java.util.concurrent.locks.Lock _readLock;
634 private ClusterMasterTokenTransitionListener
635 _schedulerClusterMasterTokenTransitionListener;
636 private final SchedulerEngine _schedulerEngine;
637 private final java.util.concurrent.locks.Lock _writeLock;
638
639 private static class SchedulerClusterInvokeAcceptor
640 implements ClusterInvokeAcceptor {
641
642 @Override
643 public boolean accept(Map<String, Serializable> context) {
644 if (ClusterInvokeThreadLocal.isEnabled()) {
645 return false;
646 }
647
648 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
649 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
650 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
651
652 if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
653 !pluginReady) {
654
655 return false;
656 }
657
658 return true;
659 }
660
661 }
662
663 private class SchedulerClusterMasterTokenTransitionListener
664 extends BaseClusterMasterTokenTransitionListener {
665
666 @Override
667 protected void doMasterTokenAcquired() throws Exception {
668 boolean forceSync = ProxyModeThreadLocal.isForceSync();
669
670 ProxyModeThreadLocal.setForceSync(true);
671
672 _writeLock.lock();
673
674 try {
675 for (ObjectValuePair<SchedulerResponse, TriggerState>
676 memoryClusteredJob : _memoryClusteredJobs.values()) {
677
678 SchedulerResponse schedulerResponse =
679 memoryClusteredJob.getKey();
680
681 _schedulerEngine.schedule(
682 schedulerResponse.getTrigger(),
683 schedulerResponse.getDescription(),
684 schedulerResponse.getDestinationName(),
685 schedulerResponse.getMessage());
686
687 TriggerState triggerState = memoryClusteredJob.getValue();
688
689 if (triggerState.equals(TriggerState.PAUSED)) {
690 _schedulerEngine.pause(
691 schedulerResponse.getJobName(),
692 schedulerResponse.getGroupName());
693 }
694 }
695
696 _memoryClusteredJobs.clear();
697
698 if (_log.isInfoEnabled()) {
699 _log.info("MEMORY_CLUSTERED jobs are running on this node");
700 }
701 }
702 finally {
703 ProxyModeThreadLocal.setForceSync(forceSync);
704
705 _writeLock.unlock();
706 }
707 }
708
709 @Override
710 protected void doMasterTokenReleased() throws Exception {
711 _writeLock.lock();
712
713 try {
714 for (SchedulerResponse schedulerResponse :
715 _schedulerEngine.getScheduledJobs()) {
716
717 if (StorageType.MEMORY_CLUSTERED ==
718 schedulerResponse.getStorageType()) {
719
720 String groupName =
721 SchedulerEngineHelperUtil.namespaceGroupName(
722 schedulerResponse.getGroupName(),
723 StorageType.MEMORY_CLUSTERED);
724
725 _schedulerEngine.delete(
726 schedulerResponse.getJobName(), groupName);
727 }
728 }
729
730 initMemoryClusteredJobs();
731
732 if (_log.isInfoEnabled()) {
733 _log.info(
734 "MEMORY_CLUSTERED jobs stop running on this node");
735 }
736 }
737 finally {
738 _writeLock.unlock();
739 }
740 }
741
742 }
743
744 }