001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018    import com.liferay.portal.kernel.bean.BeanReference;
019    import com.liferay.portal.kernel.bean.IdentifiableBean;
020    import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
021    import com.liferay.portal.kernel.cluster.BaseClusterMasterTokenTransitionListener;
022    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
023    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
024    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
025    import com.liferay.portal.kernel.cluster.ClusterMasterExecutorUtil;
026    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
027    import com.liferay.portal.kernel.cluster.ClusterRequest;
028    import com.liferay.portal.kernel.cluster.Clusterable;
029    import com.liferay.portal.kernel.log.Log;
030    import com.liferay.portal.kernel.log.LogFactoryUtil;
031    import com.liferay.portal.kernel.messaging.Message;
032    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
033    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
034    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
035    import com.liferay.portal.kernel.scheduler.SchedulerException;
036    import com.liferay.portal.kernel.scheduler.StorageType;
037    import com.liferay.portal.kernel.scheduler.Trigger;
038    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039    import com.liferay.portal.kernel.scheduler.TriggerState;
040    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
041    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
042    import com.liferay.portal.kernel.util.CharPool;
043    import com.liferay.portal.kernel.util.MethodHandler;
044    import com.liferay.portal.kernel.util.MethodKey;
045    import com.liferay.portal.kernel.util.ObjectValuePair;
046    import com.liferay.portal.kernel.util.StringPool;
047    import com.liferay.portal.scheduler.quartz.QuartzTriggerHelperUtil;
048    import com.liferay.portal.spring.aop.ServiceBeanAopProxy;
049    import com.liferay.portal.util.PropsValues;
050    
051    import java.io.Serializable;
052    
053    import java.util.Date;
054    import java.util.Iterator;
055    import java.util.List;
056    import java.util.Map;
057    import java.util.Set;
058    import java.util.concurrent.ConcurrentHashMap;
059    import java.util.concurrent.Future;
060    import java.util.concurrent.TimeUnit;
061    import java.util.concurrent.locks.ReadWriteLock;
062    import java.util.concurrent.locks.ReentrantReadWriteLock;
063    
064    import org.springframework.aop.TargetSource;
065    import org.springframework.aop.framework.AdvisedSupport;
066    
067    /**
068     * @author Tina Tian
069     */
070    public class ClusterSchedulerEngine
071            implements IdentifiableBean, SchedulerEngine {
072    
073            public static SchedulerEngine createClusterSchedulerEngine(
074                    SchedulerEngine schedulerEngine) {
075    
076                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
077                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
078                    }
079    
080                    return schedulerEngine;
081            }
082    
083            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
084                    _schedulerEngine = schedulerEngine;
085    
086                    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
087    
088                    _readLock = readWriteLock.readLock();
089                    _writeLock = readWriteLock.writeLock();
090            }
091    
092            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
093            @Override
094            public void delete(String groupName) throws SchedulerException {
095                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
096    
097                    _readLock.lock();
098    
099                    try {
100                            if (memoryClusteredSlaveJob) {
101                                    removeMemoryClusteredJobs(groupName);
102                            }
103                            else {
104                                    _schedulerEngine.delete(groupName);
105                            }
106                    }
107                    finally {
108                            _readLock.unlock();
109                    }
110    
111                    setClusterableThreadLocal(groupName);
112            }
113    
114            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
115            @Override
116            public void delete(String jobName, String groupName)
117                    throws SchedulerException {
118    
119                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
120    
121                    _readLock.lock();
122    
123                    try {
124                            if (memoryClusteredSlaveJob) {
125                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
126                            }
127                            else {
128                                    _schedulerEngine.delete(jobName, groupName);
129                            }
130                    }
131                    finally {
132                            _readLock.unlock();
133                    }
134    
135                    setClusterableThreadLocal(groupName);
136            }
137    
138            @Override
139            public String getBeanIdentifier() {
140                    return _beanIdentifier;
141            }
142    
143            @Clusterable(onMaster = true)
144            @Override
145            public SchedulerResponse getScheduledJob(String jobName, String groupName)
146                    throws SchedulerException {
147    
148                    _readLock.lock();
149    
150                    try {
151                            return _schedulerEngine.getScheduledJob(jobName, groupName);
152                    }
153                    finally {
154                            _readLock.unlock();
155                    }
156            }
157    
158            @Clusterable(onMaster = true)
159            @Override
160            public List<SchedulerResponse> getScheduledJobs()
161                    throws SchedulerException {
162    
163                    _readLock.lock();
164    
165                    try {
166                            return _schedulerEngine.getScheduledJobs();
167                    }
168                    finally {
169                            _readLock.unlock();
170                    }
171            }
172    
173            @Clusterable(onMaster = true)
174            @Override
175            public List<SchedulerResponse> getScheduledJobs(String groupName)
176                    throws SchedulerException {
177    
178                    _readLock.lock();
179    
180                    try {
181                            return _schedulerEngine.getScheduledJobs(groupName);
182                    }
183                    finally {
184                            _readLock.unlock();
185                    }
186            }
187    
188            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
189            @Override
190            public void pause(String groupName) throws SchedulerException {
191                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
192    
193                    _readLock.lock();
194    
195                    try {
196                            if (memoryClusteredSlaveJob) {
197                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
198                            }
199                            else {
200                                    _schedulerEngine.pause(groupName);
201                            }
202                    }
203                    finally {
204                            _readLock.unlock();
205                    }
206    
207                    setClusterableThreadLocal(groupName);
208            }
209    
210            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
211            @Override
212            public void pause(String jobName, String groupName)
213                    throws SchedulerException {
214    
215                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
216    
217                    _readLock.lock();
218    
219                    try {
220                            if (memoryClusteredSlaveJob) {
221                                    updateMemoryClusteredJob(
222                                            jobName, groupName, TriggerState.PAUSED);
223                            }
224                            else {
225                                    _schedulerEngine.pause(jobName, groupName);
226                            }
227                    }
228                    finally {
229                            _readLock.unlock();
230                    }
231    
232                    setClusterableThreadLocal(groupName);
233            }
234    
235            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
236            @Override
237            public void resume(String groupName) throws SchedulerException {
238                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
239    
240                    _readLock.lock();
241    
242                    try {
243                            if (memoryClusteredSlaveJob) {
244                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
245                            }
246                            else {
247                                    _schedulerEngine.resume(groupName);
248                            }
249                    }
250                    finally {
251                            _readLock.unlock();
252                    }
253    
254                    setClusterableThreadLocal(groupName);
255            }
256    
257            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
258            @Override
259            public void resume(String jobName, String groupName)
260                    throws SchedulerException {
261    
262                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
263    
264                    _readLock.lock();
265    
266                    try {
267                            if (memoryClusteredSlaveJob) {
268                                    updateMemoryClusteredJob(
269                                            jobName, groupName, TriggerState.NORMAL);
270                            }
271                            else {
272                                    _schedulerEngine.resume(jobName, groupName);
273                            }
274                    }
275                    finally {
276                            _readLock.unlock();
277                    }
278    
279                    setClusterableThreadLocal(groupName);
280            }
281    
282            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
283            @Override
284            public void schedule(
285                            Trigger trigger, String description, String destinationName,
286                            Message message)
287                    throws SchedulerException {
288    
289                    String groupName = trigger.getGroupName();
290    
291                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
292                            groupName);
293    
294                    StorageType storageType = objectValuePair.getValue();
295    
296                    _readLock.lock();
297    
298                    try {
299                            if (storageType == StorageType.MEMORY_CLUSTERED) {
300                                    if (ClusterMasterExecutorUtil.isMaster()) {
301                                            _schedulerEngine.schedule(
302                                                    trigger, description, destinationName, message);
303    
304                                            if (_portalReady) {
305                                                    _notifySlave(
306                                                            trigger, description, destinationName, message);
307                                            }
308                                    }
309                            }
310                            else {
311                                    _schedulerEngine.schedule(
312                                            trigger, description, destinationName, message);
313                            }
314                    }
315                    finally {
316                            _readLock.unlock();
317                    }
318    
319                    setClusterableThreadLocal(groupName);
320            }
321    
322            @Override
323            public void setBeanIdentifier(String beanIdentifier) {
324                    _beanIdentifier = beanIdentifier;
325            }
326    
327            @Override
328            public void shutdown() throws SchedulerException {
329                    _portalReady = false;
330    
331                    ClusterMasterExecutorUtil.
332                            unregisterClusterMasterTokenTransitionListener(
333                                    _schedulerClusterMasterTokenTransitionListener);
334    
335                    _schedulerEngine.shutdown();
336            }
337    
338            @Override
339            public void start() throws SchedulerException {
340                    try {
341                            if (!ClusterMasterExecutorUtil.isMaster()) {
342                                    initMemoryClusteredJobs();
343                            }
344    
345                            _schedulerClusterMasterTokenTransitionListener =
346                                    new SchedulerClusterMasterTokenTransitionListener();
347    
348                            ClusterMasterExecutorUtil.
349                                    registerClusterMasterTokenTransitionListener(
350                                            _schedulerClusterMasterTokenTransitionListener);
351                    }
352                    catch (Exception e) {
353                            throw new SchedulerException("Unable to initialize scheduler", e);
354                    }
355    
356                    _schedulerEngine.start();
357    
358                    _portalReady = true;
359            }
360    
361            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
362            @Override
363            public void suppressError(String jobName, String groupName)
364                    throws SchedulerException {
365    
366                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
367    
368                    if (!memoryClusteredSlaveJob) {
369                            _readLock.lock();
370    
371                            try {
372                                    _schedulerEngine.suppressError(jobName, groupName);
373                            }
374                            finally {
375                                    _readLock.unlock();
376                            }
377                    }
378    
379                    setClusterableThreadLocal(groupName);
380            }
381    
382            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
383            @Override
384            public void unschedule(String groupName) throws SchedulerException {
385                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
386    
387                    _readLock.lock();
388    
389                    try {
390                            if (memoryClusteredSlaveJob) {
391                                    removeMemoryClusteredJobs(groupName);
392                            }
393                            else {
394                                    _schedulerEngine.unschedule(groupName);
395                            }
396                    }
397                    finally {
398                            _readLock.unlock();
399                    }
400    
401                    setClusterableThreadLocal(groupName);
402            }
403    
404            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
405            @Override
406            public void unschedule(String jobName, String groupName)
407                    throws SchedulerException {
408    
409                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
410    
411                    _readLock.lock();
412    
413                    try {
414                            if (memoryClusteredSlaveJob) {
415                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
416                            }
417                            else {
418                                    _schedulerEngine.unschedule(jobName, groupName);
419                            }
420                    }
421                    finally {
422                            _readLock.unlock();
423                    }
424    
425                    setClusterableThreadLocal(groupName);
426            }
427    
428            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
429            @Override
430            public void update(Trigger trigger) throws SchedulerException {
431                    String jobName = trigger.getJobName();
432                    String groupName = trigger.getGroupName();
433    
434                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
435    
436                    _readLock.lock();
437    
438                    try {
439                            if (memoryClusteredSlaveJob) {
440                                    boolean updated = false;
441    
442                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
443                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
444    
445                                            SchedulerResponse schedulerResponse =
446                                                    memoryClusteredJob.getKey();
447    
448                                            if (jobName.equals(schedulerResponse.getJobName()) &&
449                                                    groupName.equals(schedulerResponse.getGroupName())) {
450    
451                                                    schedulerResponse.setTrigger(trigger);
452    
453                                                    updated = true;
454    
455                                                    break;
456                                            }
457                                    }
458    
459                                    if (!updated) {
460                                            throw new SchedulerException(
461                                                    "Unable to update trigger for memory clustered job");
462                                    }
463                            }
464                            else {
465                                    _schedulerEngine.update(trigger);
466                            }
467                    }
468                    finally {
469                            _readLock.unlock();
470                    }
471    
472                    setClusterableThreadLocal(groupName);
473            }
474    
475            protected void addMemoryClusteredJob(SchedulerResponse schedulerResponse)
476                    throws SchedulerException {
477    
478                    Trigger oldTrigger = schedulerResponse.getTrigger();
479    
480                    String jobName = schedulerResponse.getJobName();
481                    String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
482                            schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
483    
484                    Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
485                            oldTrigger.getTriggerType(), jobName, groupName,
486                            oldTrigger.getStartDate(), oldTrigger.getEndDate(),
487                            oldTrigger.getTriggerContent());
488    
489                    schedulerResponse.setTrigger(newTrigger);
490    
491                    TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
492                            schedulerResponse);
493    
494                    Message message = schedulerResponse.getMessage();
495    
496                    message.remove(JOB_STATE);
497    
498                    _memoryClusteredJobs.put(
499                            getFullName(jobName, groupName),
500                            new ObjectValuePair<SchedulerResponse, TriggerState>(
501                                    schedulerResponse, triggerState));
502            }
503    
504            protected String getFullName(String jobName, String groupName) {
505                    return groupName.concat(StringPool.PERIOD).concat(jobName);
506            }
507    
508            protected void initMemoryClusteredJobs() throws Exception {
509                    MethodHandler methodHandler = new MethodHandler(
510                            _getScheduledJobsMethodKey, StorageType.MEMORY_CLUSTERED);
511    
512                    Future<List<SchedulerResponse>> future =
513                            ClusterMasterExecutorUtil.executeOnMaster(methodHandler);
514    
515                    List<SchedulerResponse> schedulerResponses = future.get(
516                            PropsValues.CLUSTERABLE_ADVICE_CALL_MASTER_TIMEOUT,
517                            TimeUnit.SECONDS);
518    
519                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
520                            addMemoryClusteredJob(schedulerResponse);
521                    }
522            }
523    
524            protected boolean isMemoryClusteredSlaveJob(String groupName)
525                    throws SchedulerException {
526    
527                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
528                            groupName);
529    
530                    StorageType storageType = objectValuePair.getValue();
531    
532                    if ((storageType != StorageType.MEMORY_CLUSTERED) ||
533                            ClusterMasterExecutorUtil.isMaster()) {
534    
535                            return false;
536                    }
537    
538                    return true;
539            }
540    
541            protected void removeMemoryClusteredJobs(String groupName) {
542                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
543                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
544    
545                    Iterator
546                            <Map.Entry<String,
547                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
548                                            memoryClusteredJobs.iterator();
549    
550                    while (itr.hasNext()) {
551                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
552                                    entry = itr.next();
553    
554                            ObjectValuePair<SchedulerResponse, TriggerState>
555                                    memoryClusteredJob = entry.getValue();
556    
557                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
558    
559                            if (groupName.equals(schedulerResponse.getGroupName())) {
560                                    itr.remove();
561                            }
562                    }
563            }
564    
565            protected ObjectValuePair<String, StorageType> resolveGroupName(
566                    String groupName) {
567    
568                    int index = groupName.indexOf(CharPool.POUND);
569    
570                    String storageTypeString = groupName.substring(0, index);
571    
572                    StorageType storageType = StorageType.valueOf(storageTypeString);
573    
574                    String orginalGroupName = groupName.substring(index + 1);
575    
576                    return new ObjectValuePair<String, StorageType>(
577                            orginalGroupName, storageType);
578            }
579    
580            protected void setClusterableThreadLocal(String groupName) {
581                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
582                            groupName);
583    
584                    ClusterableContextThreadLocal.putThreadLocalContext(
585                            STORAGE_TYPE, objectValuePair.getValue());
586                    ClusterableContextThreadLocal.putThreadLocalContext(
587                            _PORTAL_READY, _portalReady);
588    
589                    boolean pluginReady = true;
590    
591                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
592                            PluginContextLifecycleThreadLocal.isDestroying()) {
593    
594                            pluginReady = false;
595                    }
596    
597                    ClusterableContextThreadLocal.putThreadLocalContext(
598                            _PLUGIN_READY, pluginReady);
599            }
600    
601            protected void updateMemoryClusteredJob(
602                    String jobName, String groupName, TriggerState triggerState) {
603    
604                    ObjectValuePair<SchedulerResponse, TriggerState>
605                            memoryClusteredJob = _memoryClusteredJobs.get(
606                                    getFullName(jobName, groupName));
607    
608                    if (memoryClusteredJob != null) {
609                            memoryClusteredJob.setValue(triggerState);
610                    }
611            }
612    
613            protected void updateMemoryClusteredJobs(
614                    String groupName, TriggerState triggerState) {
615    
616                    for (ObjectValuePair<SchedulerResponse, TriggerState>
617                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
618    
619                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
620    
621                            if (groupName.equals(schedulerResponse.getGroupName())) {
622                                    memoryClusteredJob.setValue(triggerState);
623                            }
624                    }
625            }
626    
627            private static void _addMemoryClusteredJob(
628                            SchedulerResponse schedulerResponse, String beanIdentifier)
629                    throws Exception {
630    
631                    Object serviceBean = PortalBeanLocatorUtil.locate(beanIdentifier);
632    
633                    AdvisedSupport advisedSupport = ServiceBeanAopProxy.getAdvisedSupport(
634                            serviceBean);
635    
636                    TargetSource targetSource = advisedSupport.getTargetSource();
637    
638                    ClusterSchedulerEngine clusterSchedulerEngine =
639                            (ClusterSchedulerEngine)targetSource.getTarget();
640    
641                    if (!clusterSchedulerEngine._portalReady) {
642                            return;
643                    }
644    
645                    String jobName = schedulerResponse.getJobName();
646                    String groupName = schedulerResponse.getGroupName();
647    
648                    java.util.concurrent.locks.Lock readLock =
649                            clusterSchedulerEngine._readLock;
650    
651                    readLock.lock();
652    
653                    try {
654                            Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
655                                    memoryClusteredJobs =
656                                            clusterSchedulerEngine._memoryClusteredJobs;
657    
658                            memoryClusteredJobs.put(
659                                    clusterSchedulerEngine.getFullName(jobName, groupName),
660                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
661                                            schedulerResponse, TriggerState.NORMAL));
662    
663                            if (_log.isInfoEnabled()) {
664                                    _log.info(
665                                            "Receive notification from master, add memory clustered " +
666                                                    "job " + schedulerResponse);
667                            }
668                    }
669                    finally {
670                            readLock.unlock();
671                    }
672            }
673    
674            private void _notifySlave(
675                            Trigger trigger, String description, String destinationName,
676                            Message message) {
677    
678                    String groupName = trigger.getGroupName();
679                    String jobName = trigger.getJobName();
680    
681                    SchedulerResponse schedulerResponse = new SchedulerResponse();
682    
683                    schedulerResponse.setDescription(description);
684                    schedulerResponse.setDestinationName(destinationName);
685                    schedulerResponse.setGroupName(groupName);
686                    schedulerResponse.setJobName(jobName);
687                    schedulerResponse.setTrigger(trigger);
688                    schedulerResponse.setMessage(message);
689    
690                    try {
691                            MethodHandler methodHandler = new MethodHandler(
692                                    _addMemoryClusteredJobMethodKey, schedulerResponse,
693                                    _beanIdentifier);
694    
695                            ClusterRequest clusterRequest =
696                                    ClusterRequest.createMulticastRequest(methodHandler, true);
697    
698                            clusterRequest.setFireAndForget(true);
699    
700                            ClusterExecutorUtil.execute(clusterRequest);
701                    }
702                    catch (Throwable t) {
703                            _log.error("Unable to notify slave", t);
704                    }
705            }
706    
707            @BeanReference(
708                    name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
709            protected SchedulerEngine schedulerEngine;
710    
711            private static final String _PLUGIN_READY = "plugin.ready";
712    
713            private static final String _PORTAL_READY = "portal.ready";
714    
715            private static Log _log = LogFactoryUtil.getLog(
716                    ClusterSchedulerEngine.class);
717    
718            private static final MethodKey _addMemoryClusteredJobMethodKey =
719                    new MethodKey(
720                            ClusterSchedulerEngine.class, "_addMemoryClusteredJob",
721                            SchedulerResponse.class, String.class);
722            private static final MethodKey _getScheduledJobMethodKey = new MethodKey(
723                    SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
724                    String.class, StorageType.class);
725            private static final MethodKey _getScheduledJobsMethodKey = new MethodKey(
726                    SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
727    
728            private String _beanIdentifier;
729            private final Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
730                    _memoryClusteredJobs = new ConcurrentHashMap
731                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
732            private boolean _portalReady;
733            private final java.util.concurrent.locks.Lock _readLock;
734            private ClusterMasterTokenTransitionListener
735                    _schedulerClusterMasterTokenTransitionListener;
736            private final SchedulerEngine _schedulerEngine;
737            private final java.util.concurrent.locks.Lock _writeLock;
738    
739            private static class SchedulerClusterInvokeAcceptor
740                    implements ClusterInvokeAcceptor {
741    
742                    @Override
743                    public boolean accept(Map<String, Serializable> context) {
744                            if (ClusterInvokeThreadLocal.isEnabled()) {
745                                    return false;
746                            }
747    
748                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
749                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
750                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
751    
752                            if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
753                                    !pluginReady) {
754    
755                                    return false;
756                            }
757    
758                            return true;
759                    }
760    
761            }
762    
763            private class SchedulerClusterMasterTokenTransitionListener
764                    extends BaseClusterMasterTokenTransitionListener {
765    
766                    @Override
767                    protected void doMasterTokenAcquired() throws Exception {
768                            boolean forceSync = ProxyModeThreadLocal.isForceSync();
769    
770                            ProxyModeThreadLocal.setForceSync(true);
771    
772                            _writeLock.lock();
773    
774                            try {
775                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
776                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
777    
778                                            SchedulerResponse schedulerResponse =
779                                                    memoryClusteredJob.getKey();
780    
781                                            Trigger oldTrigger = schedulerResponse.getTrigger();
782    
783                                            Date startDate = QuartzTriggerHelperUtil.getFireTimeAfter(
784                                                    oldTrigger, new Date());
785    
786                                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
787                                                    oldTrigger.getTriggerType(), oldTrigger.getJobName(),
788                                                    oldTrigger.getGroupName(), startDate,
789                                                    oldTrigger.getEndDate(),
790                                                    oldTrigger.getTriggerContent());
791    
792                                            _schedulerEngine.schedule(
793                                                    newTrigger, schedulerResponse.getDescription(),
794                                                    schedulerResponse.getDestinationName(),
795                                                    schedulerResponse.getMessage());
796    
797                                            TriggerState triggerState = memoryClusteredJob.getValue();
798    
799                                            if (triggerState.equals(TriggerState.PAUSED)) {
800                                                    _schedulerEngine.pause(
801                                                            schedulerResponse.getJobName(),
802                                                            schedulerResponse.getGroupName());
803                                            }
804                                    }
805    
806                                    _memoryClusteredJobs.clear();
807    
808                                    if (_log.isInfoEnabled()) {
809                                            _log.info("MEMORY_CLUSTERED jobs are running on this node");
810                                    }
811                            }
812                            finally {
813                                    ProxyModeThreadLocal.setForceSync(forceSync);
814    
815                                    _writeLock.unlock();
816                            }
817                    }
818    
819                    @Override
820                    protected void doMasterTokenReleased() throws Exception {
821                            _writeLock.lock();
822    
823                            try {
824                                    for (SchedulerResponse schedulerResponse :
825                                                    _schedulerEngine.getScheduledJobs()) {
826    
827                                            if (StorageType.MEMORY_CLUSTERED ==
828                                                            schedulerResponse.getStorageType()) {
829    
830                                                    String groupName =
831                                                            SchedulerEngineHelperUtil.namespaceGroupName(
832                                                                    schedulerResponse.getGroupName(),
833                                                                    StorageType.MEMORY_CLUSTERED);
834    
835                                                    _schedulerEngine.delete(
836                                                            schedulerResponse.getJobName(), groupName);
837                                            }
838                                    }
839    
840                                    initMemoryClusteredJobs();
841    
842                                    if (_log.isInfoEnabled()) {
843                                            _log.info(
844                                                    "MEMORY_CLUSTERED jobs stop running on this node");
845                                    }
846                            }
847                            finally {
848                                    _writeLock.unlock();
849                            }
850                    }
851    
852            }
853    
854    }