001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018    import com.liferay.portal.kernel.bean.BeanReference;
019    import com.liferay.portal.kernel.bean.IdentifiableBean;
020    import com.liferay.portal.kernel.cluster.BaseClusterMasterTokenTransitionListener;
021    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
022    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
023    import com.liferay.portal.kernel.cluster.ClusterMasterExecutorUtil;
024    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
025    import com.liferay.portal.kernel.cluster.Clusterable;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.messaging.Message;
029    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
030    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
031    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
032    import com.liferay.portal.kernel.scheduler.SchedulerException;
033    import com.liferay.portal.kernel.scheduler.StorageType;
034    import com.liferay.portal.kernel.scheduler.Trigger;
035    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
036    import com.liferay.portal.kernel.scheduler.TriggerState;
037    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
038    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
039    import com.liferay.portal.kernel.util.CharPool;
040    import com.liferay.portal.kernel.util.MethodHandler;
041    import com.liferay.portal.kernel.util.MethodKey;
042    import com.liferay.portal.kernel.util.ObjectValuePair;
043    import com.liferay.portal.kernel.util.StringPool;
044    import com.liferay.portal.util.PropsValues;
045    
046    import java.io.Serializable;
047    
048    import java.util.Iterator;
049    import java.util.List;
050    import java.util.Map;
051    import java.util.Set;
052    import java.util.concurrent.ConcurrentHashMap;
053    import java.util.concurrent.Future;
054    import java.util.concurrent.TimeUnit;
055    import java.util.concurrent.locks.ReadWriteLock;
056    import java.util.concurrent.locks.ReentrantReadWriteLock;
057    
058    /**
059     * @author Tina Tian
060     */
061    public class ClusterSchedulerEngine
062            implements IdentifiableBean, SchedulerEngine {
063    
064            public static SchedulerEngine createClusterSchedulerEngine(
065                    SchedulerEngine schedulerEngine) {
066    
067                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
068                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
069                    }
070    
071                    return schedulerEngine;
072            }
073    
074            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
075                    _schedulerEngine = schedulerEngine;
076    
077                    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
078    
079                    _readLock = readWriteLock.readLock();
080                    _writeLock = readWriteLock.writeLock();
081            }
082    
083            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
084            @Override
085            public void delete(String groupName) throws SchedulerException {
086                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
087    
088                    _readLock.lock();
089    
090                    try {
091                            if (memoryClusteredSlaveJob) {
092                                    removeMemoryClusteredJobs(groupName);
093                            }
094                            else {
095                                    _schedulerEngine.delete(groupName);
096                            }
097                    }
098                    finally {
099                            _readLock.unlock();
100                    }
101    
102                    setClusterableThreadLocal(groupName);
103            }
104    
105            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
106            @Override
107            public void delete(String jobName, String groupName)
108                    throws SchedulerException {
109    
110                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
111    
112                    _readLock.lock();
113    
114                    try {
115                            if (memoryClusteredSlaveJob) {
116                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
117                            }
118                            else {
119                                    _schedulerEngine.delete(jobName, groupName);
120                            }
121                    }
122                    finally {
123                            _readLock.unlock();
124                    }
125    
126                    setClusterableThreadLocal(groupName);
127            }
128    
129            @Override
130            public String getBeanIdentifier() {
131                    return _beanIdentifier;
132            }
133    
134            @Clusterable(onMaster = true)
135            @Override
136            public SchedulerResponse getScheduledJob(String jobName, String groupName)
137                    throws SchedulerException {
138    
139                    _readLock.lock();
140    
141                    try {
142                            return _schedulerEngine.getScheduledJob(jobName, groupName);
143                    }
144                    finally {
145                            _readLock.unlock();
146                    }
147            }
148    
149            @Clusterable(onMaster = true)
150            @Override
151            public List<SchedulerResponse> getScheduledJobs()
152                    throws SchedulerException {
153    
154                    _readLock.lock();
155    
156                    try {
157                            return _schedulerEngine.getScheduledJobs();
158                    }
159                    finally {
160                            _readLock.unlock();
161                    }
162            }
163    
164            @Clusterable(onMaster = true)
165            @Override
166            public List<SchedulerResponse> getScheduledJobs(String groupName)
167                    throws SchedulerException {
168    
169                    _readLock.lock();
170    
171                    try {
172                            return _schedulerEngine.getScheduledJobs(groupName);
173                    }
174                    finally {
175                            _readLock.unlock();
176                    }
177            }
178    
179            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
180            @Override
181            public void pause(String groupName) throws SchedulerException {
182                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
183    
184                    _readLock.lock();
185    
186                    try {
187                            if (memoryClusteredSlaveJob) {
188                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
189                            }
190                            else {
191                                    _schedulerEngine.pause(groupName);
192                            }
193                    }
194                    finally {
195                            _readLock.unlock();
196                    }
197    
198                    setClusterableThreadLocal(groupName);
199            }
200    
201            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
202            @Override
203            public void pause(String jobName, String groupName)
204                    throws SchedulerException {
205    
206                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
207    
208                    _readLock.lock();
209    
210                    try {
211                            if (memoryClusteredSlaveJob) {
212                                    updateMemoryClusteredJob(
213                                            jobName, groupName, TriggerState.PAUSED);
214                            }
215                            else {
216                                    _schedulerEngine.pause(jobName, groupName);
217                            }
218                    }
219                    finally {
220                            _readLock.unlock();
221                    }
222    
223                    setClusterableThreadLocal(groupName);
224            }
225    
226            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
227            @Override
228            public void resume(String groupName) throws SchedulerException {
229                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
230    
231                    _readLock.lock();
232    
233                    try {
234                            if (memoryClusteredSlaveJob) {
235                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
236                            }
237                            else {
238                                    _schedulerEngine.resume(groupName);
239                            }
240                    }
241                    finally {
242                            _readLock.unlock();
243                    }
244    
245                    setClusterableThreadLocal(groupName);
246            }
247    
248            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
249            @Override
250            public void resume(String jobName, String groupName)
251                    throws SchedulerException {
252    
253                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
254    
255                    _readLock.lock();
256    
257                    try {
258                            if (memoryClusteredSlaveJob) {
259                                    updateMemoryClusteredJob(
260                                            jobName, groupName, TriggerState.NORMAL);
261                            }
262                            else {
263                                    _schedulerEngine.resume(jobName, groupName);
264                            }
265                    }
266                    finally {
267                            _readLock.unlock();
268                    }
269    
270                    setClusterableThreadLocal(groupName);
271            }
272    
273            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
274            @Override
275            public void schedule(
276                            Trigger trigger, String description, String destinationName,
277                            Message message)
278                    throws SchedulerException {
279    
280                    String groupName = trigger.getGroupName();
281                    String jobName = trigger.getJobName();
282    
283                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
284    
285                    _readLock.lock();
286    
287                    try {
288                            if (memoryClusteredSlaveJob) {
289                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
290    
291                                    schedulerResponse.setDescription(description);
292                                    schedulerResponse.setDestinationName(destinationName);
293                                    schedulerResponse.setGroupName(groupName);
294                                    schedulerResponse.setJobName(jobName);
295                                    schedulerResponse.setMessage(message);
296                                    schedulerResponse.setTrigger(trigger);
297    
298                                    _memoryClusteredJobs.put(
299                                            getFullName(jobName, groupName),
300                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
301                                                    schedulerResponse, TriggerState.NORMAL));
302                            }
303                            else {
304                                    _schedulerEngine.schedule(
305                                            trigger, description, destinationName, message);
306                            }
307                    }
308                    finally {
309                            _readLock.unlock();
310                    }
311    
312                    setClusterableThreadLocal(groupName);
313            }
314    
315            @Override
316            public void setBeanIdentifier(String beanIdentifier) {
317                    _beanIdentifier = beanIdentifier;
318            }
319    
320            @Override
321            public void shutdown() throws SchedulerException {
322                    _portalReady = false;
323    
324                    ClusterMasterExecutorUtil.
325                            unregisterClusterMasterTokenTransitionListener(
326                                    _schedulerClusterMasterTokenTransitionListener);
327    
328                    _schedulerEngine.shutdown();
329            }
330    
331            @Override
332            public void start() throws SchedulerException {
333                    try {
334                            if (!ClusterMasterExecutorUtil.isMaster()) {
335                                    initMemoryClusteredJobs();
336                            }
337    
338                            _schedulerClusterMasterTokenTransitionListener =
339                                    new SchedulerClusterMasterTokenTransitionListener();
340    
341                            ClusterMasterExecutorUtil.
342                                    registerClusterMasterTokenTransitionListener(
343                                            _schedulerClusterMasterTokenTransitionListener);
344                    }
345                    catch (Exception e) {
346                            throw new SchedulerException("Unable to initialize scheduler", e);
347                    }
348    
349                    _schedulerEngine.start();
350    
351                    _portalReady = true;
352            }
353    
354            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
355            @Override
356            public void suppressError(String jobName, String groupName)
357                    throws SchedulerException {
358    
359                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
360    
361                    if (!memoryClusteredSlaveJob) {
362                            _readLock.lock();
363    
364                            try {
365                                    _schedulerEngine.suppressError(jobName, groupName);
366                            }
367                            finally {
368                                    _readLock.unlock();
369                            }
370                    }
371    
372                    setClusterableThreadLocal(groupName);
373            }
374    
375            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
376            @Override
377            public void unschedule(String groupName) throws SchedulerException {
378                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
379    
380                    _readLock.lock();
381    
382                    try {
383                            if (memoryClusteredSlaveJob) {
384                                    removeMemoryClusteredJobs(groupName);
385                            }
386                            else {
387                                    _schedulerEngine.unschedule(groupName);
388                            }
389                    }
390                    finally {
391                            _readLock.unlock();
392                    }
393    
394                    setClusterableThreadLocal(groupName);
395            }
396    
397            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
398            @Override
399            public void unschedule(String jobName, String groupName)
400                    throws SchedulerException {
401    
402                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
403    
404                    _readLock.lock();
405    
406                    try {
407                            if (memoryClusteredSlaveJob) {
408                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
409                            }
410                            else {
411                                    _schedulerEngine.unschedule(jobName, groupName);
412                            }
413                    }
414                    finally {
415                            _readLock.unlock();
416                    }
417    
418                    setClusterableThreadLocal(groupName);
419            }
420    
421            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
422            @Override
423            public void update(Trigger trigger) throws SchedulerException {
424                    String jobName = trigger.getJobName();
425                    String groupName = trigger.getGroupName();
426    
427                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
428    
429                    _readLock.lock();
430    
431                    try {
432                            if (memoryClusteredSlaveJob) {
433                                    boolean updated = false;
434    
435                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
436                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
437    
438                                            SchedulerResponse schedulerResponse =
439                                                    memoryClusteredJob.getKey();
440    
441                                            if (jobName.equals(schedulerResponse.getJobName()) &&
442                                                    groupName.equals(schedulerResponse.getGroupName())) {
443    
444                                                    schedulerResponse.setTrigger(trigger);
445    
446                                                    updated = true;
447    
448                                                    break;
449                                            }
450                                    }
451    
452                                    if (!updated) {
453                                            throw new SchedulerException(
454                                                    "Unable to update trigger for memory clustered job");
455                                    }
456                            }
457                            else {
458                                    _schedulerEngine.update(trigger);
459                            }
460                    }
461                    finally {
462                            _readLock.unlock();
463                    }
464    
465                    setClusterableThreadLocal(groupName);
466            }
467    
468            protected String getFullName(String jobName, String groupName) {
469                    return groupName.concat(StringPool.PERIOD).concat(jobName);
470            }
471    
472            protected void initMemoryClusteredJobs() throws Exception {
473                    MethodHandler methodHandler = new MethodHandler(
474                            _getScheduledJobsMethodKey, StorageType.MEMORY_CLUSTERED);
475    
476                    Future<List<SchedulerResponse>> future =
477                            ClusterMasterExecutorUtil.executeOnMaster(methodHandler);
478    
479                    List<SchedulerResponse> schedulerResponses = future.get(
480                            PropsValues.CLUSTERABLE_ADVICE_CALL_MASTER_TIMEOUT,
481                            TimeUnit.SECONDS);
482    
483                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
484                            Trigger oldTrigger = schedulerResponse.getTrigger();
485    
486                            String jobName = schedulerResponse.getJobName();
487                            String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
488                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
489    
490                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
491                                    oldTrigger.getTriggerType(), jobName, groupName,
492                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
493                                    oldTrigger.getTriggerContent());
494    
495                            schedulerResponse.setTrigger(newTrigger);
496    
497                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
498                                    schedulerResponse);
499    
500                            Message message = schedulerResponse.getMessage();
501    
502                            message.remove(JOB_STATE);
503    
504                            _memoryClusteredJobs.put(
505                                    getFullName(jobName, groupName),
506                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
507                                            schedulerResponse, triggerState));
508                    }
509            }
510    
511            protected boolean isMemoryClusteredSlaveJob(String groupName)
512                    throws SchedulerException {
513    
514                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
515                            groupName);
516    
517                    StorageType storageType = objectValuePair.getValue();
518    
519                    if ((storageType != StorageType.MEMORY_CLUSTERED) ||
520                            ClusterMasterExecutorUtil.isMaster()) {
521    
522                            return false;
523                    }
524    
525                    return true;
526            }
527    
528            protected void removeMemoryClusteredJobs(String groupName) {
529                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
530                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
531    
532                    Iterator
533                            <Map.Entry<String,
534                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
535                                            memoryClusteredJobs.iterator();
536    
537                    while (itr.hasNext()) {
538                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
539                                    entry = itr.next();
540    
541                            ObjectValuePair<SchedulerResponse, TriggerState>
542                                    memoryClusteredJob = entry.getValue();
543    
544                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
545    
546                            if (groupName.equals(schedulerResponse.getGroupName())) {
547                                    itr.remove();
548                            }
549                    }
550            }
551    
552            protected ObjectValuePair<String, StorageType> resolveGroupName(
553                    String groupName) {
554    
555                    int index = groupName.indexOf(CharPool.POUND);
556    
557                    String storageTypeString = groupName.substring(0, index);
558    
559                    StorageType storageType = StorageType.valueOf(storageTypeString);
560    
561                    String orginalGroupName = groupName.substring(index + 1);
562    
563                    return new ObjectValuePair<String, StorageType>(
564                            orginalGroupName, storageType);
565            }
566    
567            protected void setClusterableThreadLocal(String groupName) {
568                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
569                            groupName);
570    
571                    ClusterableContextThreadLocal.putThreadLocalContext(
572                            STORAGE_TYPE, objectValuePair.getValue());
573                    ClusterableContextThreadLocal.putThreadLocalContext(
574                            _PORTAL_READY, _portalReady);
575    
576                    boolean pluginReady = true;
577    
578                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
579                            PluginContextLifecycleThreadLocal.isDestroying()) {
580    
581                            pluginReady = false;
582                    }
583    
584                    ClusterableContextThreadLocal.putThreadLocalContext(
585                            _PLUGIN_READY, pluginReady);
586            }
587    
588            protected void updateMemoryClusteredJob(
589                    String jobName, String groupName, TriggerState triggerState) {
590    
591                    ObjectValuePair<SchedulerResponse, TriggerState>
592                            memoryClusteredJob = _memoryClusteredJobs.get(
593                                    getFullName(jobName, groupName));
594    
595                    if (memoryClusteredJob != null) {
596                            memoryClusteredJob.setValue(triggerState);
597                    }
598            }
599    
600            protected void updateMemoryClusteredJobs(
601                    String groupName, TriggerState triggerState) {
602    
603                    for (ObjectValuePair<SchedulerResponse, TriggerState>
604                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
605    
606                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
607    
608                            if (groupName.equals(schedulerResponse.getGroupName())) {
609                                    memoryClusteredJob.setValue(triggerState);
610                            }
611                    }
612            }
613    
614            @BeanReference(
615                    name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
616            protected SchedulerEngine schedulerEngine;
617    
618            private static final String _PLUGIN_READY = "plugin.ready";
619    
620            private static final String _PORTAL_READY = "portal.ready";
621    
622            private static Log _log = LogFactoryUtil.getLog(
623                    ClusterSchedulerEngine.class);
624    
625            private static final MethodKey _getScheduledJobsMethodKey = new MethodKey(
626                    SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
627    
628            private String _beanIdentifier;
629            private final Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
630                    _memoryClusteredJobs = new ConcurrentHashMap
631                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
632            private boolean _portalReady;
633            private final java.util.concurrent.locks.Lock _readLock;
634            private ClusterMasterTokenTransitionListener
635                    _schedulerClusterMasterTokenTransitionListener;
636            private final SchedulerEngine _schedulerEngine;
637            private final java.util.concurrent.locks.Lock _writeLock;
638    
639            private static class SchedulerClusterInvokeAcceptor
640                    implements ClusterInvokeAcceptor {
641    
642                    @Override
643                    public boolean accept(Map<String, Serializable> context) {
644                            if (ClusterInvokeThreadLocal.isEnabled()) {
645                                    return false;
646                            }
647    
648                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
649                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
650                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
651    
652                            if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
653                                    !pluginReady) {
654    
655                                    return false;
656                            }
657    
658                            return true;
659                    }
660    
661            }
662    
663            private class SchedulerClusterMasterTokenTransitionListener
664                    extends BaseClusterMasterTokenTransitionListener {
665    
666                    @Override
667                    protected void doMasterTokenAcquired() throws Exception {
668                            boolean forceSync = ProxyModeThreadLocal.isForceSync();
669    
670                            ProxyModeThreadLocal.setForceSync(true);
671    
672                            _writeLock.lock();
673    
674                            try {
675                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
676                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
677    
678                                            SchedulerResponse schedulerResponse =
679                                                    memoryClusteredJob.getKey();
680    
681                                            _schedulerEngine.schedule(
682                                                    schedulerResponse.getTrigger(),
683                                                    schedulerResponse.getDescription(),
684                                                    schedulerResponse.getDestinationName(),
685                                                    schedulerResponse.getMessage());
686    
687                                            TriggerState triggerState = memoryClusteredJob.getValue();
688    
689                                            if (triggerState.equals(TriggerState.PAUSED)) {
690                                                    _schedulerEngine.pause(
691                                                            schedulerResponse.getJobName(),
692                                                            schedulerResponse.getGroupName());
693                                            }
694                                    }
695    
696                                    _memoryClusteredJobs.clear();
697    
698                                    if (_log.isInfoEnabled()) {
699                                            _log.info("MEMORY_CLUSTERED jobs are running on this node");
700                                    }
701                            }
702                            finally {
703                                    ProxyModeThreadLocal.setForceSync(forceSync);
704    
705                                    _writeLock.unlock();
706                            }
707                    }
708    
709                    @Override
710                    protected void doMasterTokenReleased() throws Exception {
711                            _writeLock.lock();
712    
713                            try {
714                                    for (SchedulerResponse schedulerResponse :
715                                                    _schedulerEngine.getScheduledJobs()) {
716    
717                                            if (StorageType.MEMORY_CLUSTERED ==
718                                                            schedulerResponse.getStorageType()) {
719    
720                                                    String groupName =
721                                                            SchedulerEngineHelperUtil.namespaceGroupName(
722                                                                    schedulerResponse.getGroupName(),
723                                                                    StorageType.MEMORY_CLUSTERED);
724    
725                                                    _schedulerEngine.delete(
726                                                            schedulerResponse.getJobName(), groupName);
727                                            }
728                                    }
729    
730                                    initMemoryClusteredJobs();
731    
732                                    if (_log.isInfoEnabled()) {
733                                            _log.info(
734                                                    "MEMORY_CLUSTERED jobs stop running on this node");
735                                    }
736                            }
737                            finally {
738                                    _writeLock.unlock();
739                            }
740                    }
741    
742            }
743    
744    }