001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018    import com.liferay.portal.kernel.bean.BeanReference;
019    import com.liferay.portal.kernel.bean.IdentifiableBean;
020    import com.liferay.portal.kernel.cluster.BaseClusterMasterTokenTransitionListener;
021    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
022    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
023    import com.liferay.portal.kernel.cluster.ClusterMasterExecutorUtil;
024    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
025    import com.liferay.portal.kernel.cluster.Clusterable;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.messaging.Message;
029    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
030    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
031    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
032    import com.liferay.portal.kernel.scheduler.SchedulerException;
033    import com.liferay.portal.kernel.scheduler.StorageType;
034    import com.liferay.portal.kernel.scheduler.Trigger;
035    import com.liferay.portal.kernel.scheduler.TriggerState;
036    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
037    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
038    import com.liferay.portal.kernel.util.MethodHandler;
039    import com.liferay.portal.kernel.util.MethodKey;
040    import com.liferay.portal.kernel.util.ObjectValuePair;
041    import com.liferay.portal.kernel.util.StringPool;
042    import com.liferay.portal.util.PropsValues;
043    
044    import java.io.Serializable;
045    
046    import java.util.Iterator;
047    import java.util.List;
048    import java.util.Map;
049    import java.util.Set;
050    import java.util.concurrent.ConcurrentHashMap;
051    import java.util.concurrent.Future;
052    import java.util.concurrent.TimeUnit;
053    import java.util.concurrent.locks.ReadWriteLock;
054    import java.util.concurrent.locks.ReentrantReadWriteLock;
055    
056    /**
057     * @author Tina Tian
058     */
059    public class ClusterSchedulerEngine
060            implements IdentifiableBean, SchedulerEngine {
061    
062            public static SchedulerEngine createClusterSchedulerEngine(
063                    SchedulerEngine schedulerEngine) {
064    
065                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
066                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
067                    }
068    
069                    return schedulerEngine;
070            }
071    
072            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
073                    _schedulerEngine = schedulerEngine;
074    
075                    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
076    
077                    _readLock = readWriteLock.readLock();
078                    _writeLock = readWriteLock.writeLock();
079            }
080    
081            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
082            @Override
083            public void delete(String groupName, StorageType storageType)
084                    throws SchedulerException {
085    
086                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
087                            storageType);
088    
089                    _readLock.lock();
090    
091                    try {
092                            if (memoryClusteredSlaveJob) {
093                                    removeMemoryClusteredJobs(groupName);
094                            }
095                            else {
096                                    _schedulerEngine.delete(groupName, storageType);
097                            }
098                    }
099                    finally {
100                            _readLock.unlock();
101                    }
102    
103                    setClusterableThreadLocal(storageType);
104            }
105    
106            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
107            @Override
108            public void delete(
109                            String jobName, String groupName, StorageType storageType)
110                    throws SchedulerException {
111    
112                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
113                            storageType);
114    
115                    _readLock.lock();
116    
117                    try {
118                            if (memoryClusteredSlaveJob) {
119                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
120                            }
121                            else {
122                                    _schedulerEngine.delete(jobName, groupName, storageType);
123                            }
124                    }
125                    finally {
126                            _readLock.unlock();
127                    }
128    
129                    setClusterableThreadLocal(storageType);
130            }
131    
132            @Override
133            public String getBeanIdentifier() {
134                    return _beanIdentifier;
135            }
136    
137            @Clusterable(onMaster = true)
138            @Override
139            public SchedulerResponse getScheduledJob(
140                            String jobName, String groupName, StorageType storageType)
141                    throws SchedulerException {
142    
143                    _readLock.lock();
144    
145                    try {
146                            return _schedulerEngine.getScheduledJob(
147                                    jobName, groupName, storageType);
148                    }
149                    finally {
150                            _readLock.unlock();
151                    }
152            }
153    
154            @Clusterable(onMaster = true)
155            @Override
156            public List<SchedulerResponse> getScheduledJobs()
157                    throws SchedulerException {
158    
159                    _readLock.lock();
160    
161                    try {
162                            return _schedulerEngine.getScheduledJobs();
163                    }
164                    finally {
165                            _readLock.unlock();
166                    }
167            }
168    
169            @Clusterable(onMaster = true)
170            @Override
171            public List<SchedulerResponse> getScheduledJobs(StorageType storageType)
172                    throws SchedulerException {
173    
174                    _readLock.lock();
175    
176                    try {
177                            return _schedulerEngine.getScheduledJobs(storageType);
178                    }
179                    finally {
180                            _readLock.unlock();
181                    }
182            }
183    
184            @Clusterable(onMaster = true)
185            @Override
186            public List<SchedulerResponse> getScheduledJobs(
187                            String groupName, StorageType storageType)
188                    throws SchedulerException {
189    
190                    _readLock.lock();
191    
192                    try {
193                            return _schedulerEngine.getScheduledJobs(groupName, storageType);
194                    }
195                    finally {
196                            _readLock.unlock();
197                    }
198            }
199    
200            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
201            @Override
202            public void pause(String groupName, StorageType storageType)
203                    throws SchedulerException {
204    
205                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
206                            storageType);
207    
208                    _readLock.lock();
209    
210                    try {
211                            if (memoryClusteredSlaveJob) {
212                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
213                            }
214                            else {
215                                    _schedulerEngine.pause(groupName, storageType);
216                            }
217                    }
218                    finally {
219                            _readLock.unlock();
220                    }
221    
222                    setClusterableThreadLocal(storageType);
223            }
224    
225            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
226            @Override
227            public void pause(String jobName, String groupName, StorageType storageType)
228                    throws SchedulerException {
229    
230                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
231                            storageType);
232    
233                    _readLock.lock();
234    
235                    try {
236                            if (memoryClusteredSlaveJob) {
237                                    updateMemoryClusteredJob(
238                                            jobName, groupName, TriggerState.PAUSED);
239                            }
240                            else {
241                                    _schedulerEngine.pause(jobName, groupName, storageType);
242                            }
243                    }
244                    finally {
245                            _readLock.unlock();
246                    }
247    
248                    setClusterableThreadLocal(storageType);
249            }
250    
251            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
252            @Override
253            public void resume(String groupName, StorageType storageType)
254                    throws SchedulerException {
255    
256                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
257                            storageType);
258    
259                    _readLock.lock();
260    
261                    try {
262                            if (memoryClusteredSlaveJob) {
263                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
264                            }
265                            else {
266                                    _schedulerEngine.resume(groupName, storageType);
267                            }
268                    }
269                    finally {
270                            _readLock.unlock();
271                    }
272    
273                    setClusterableThreadLocal(storageType);
274            }
275    
276            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
277            @Override
278            public void resume(
279                            String jobName, String groupName, StorageType storageType)
280                    throws SchedulerException {
281    
282                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
283                            storageType);
284    
285                    _readLock.lock();
286    
287                    try {
288                            if (memoryClusteredSlaveJob) {
289                                    updateMemoryClusteredJob(
290                                            jobName, groupName, TriggerState.NORMAL);
291                            }
292                            else {
293                                    _schedulerEngine.resume(jobName, groupName, storageType);
294                            }
295                    }
296                    finally {
297                            _readLock.unlock();
298                    }
299    
300                    setClusterableThreadLocal(storageType);
301            }
302    
303            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
304            @Override
305            public void schedule(
306                            Trigger trigger, String description, String destinationName,
307                            Message message, StorageType storageType)
308                    throws SchedulerException {
309    
310                    String groupName = trigger.getGroupName();
311                    String jobName = trigger.getJobName();
312    
313                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
314                            storageType);
315    
316                    _readLock.lock();
317    
318                    try {
319                            if (memoryClusteredSlaveJob) {
320                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
321    
322                                    schedulerResponse.setDescription(description);
323                                    schedulerResponse.setDestinationName(destinationName);
324                                    schedulerResponse.setGroupName(groupName);
325                                    schedulerResponse.setJobName(jobName);
326                                    schedulerResponse.setMessage(message);
327                                    schedulerResponse.setTrigger(trigger);
328                                    schedulerResponse.setStorageType(storageType);
329    
330                                    _memoryClusteredJobs.put(
331                                            getFullName(jobName, groupName),
332                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
333                                                    schedulerResponse, TriggerState.NORMAL));
334                            }
335                            else {
336                                    _schedulerEngine.schedule(
337                                            trigger, description, destinationName, message,
338                                            storageType);
339                            }
340                    }
341                    finally {
342                            _readLock.unlock();
343                    }
344    
345                    setClusterableThreadLocal(storageType);
346            }
347    
348            @Override
349            public void setBeanIdentifier(String beanIdentifier) {
350                    _beanIdentifier = beanIdentifier;
351            }
352    
353            @Override
354            public void shutdown() throws SchedulerException {
355                    _portalReady = false;
356    
357                    ClusterMasterExecutorUtil.
358                            unregisterClusterMasterTokenTransitionListener(
359                                    _schedulerClusterMasterTokenTransitionListener);
360    
361                    _schedulerEngine.shutdown();
362            }
363    
364            @Override
365            public void start() throws SchedulerException {
366                    try {
367                            if (!ClusterMasterExecutorUtil.isMaster()) {
368                                    initMemoryClusteredJobs();
369                            }
370    
371                            _schedulerClusterMasterTokenTransitionListener =
372                                    new SchedulerClusterMasterTokenTransitionListener();
373    
374                            ClusterMasterExecutorUtil.
375                                    registerClusterMasterTokenTransitionListener(
376                                            _schedulerClusterMasterTokenTransitionListener);
377                    }
378                    catch (Exception e) {
379                            throw new SchedulerException("Unable to initialize scheduler", e);
380                    }
381    
382                    _schedulerEngine.start();
383    
384                    _portalReady = true;
385            }
386    
387            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
388            @Override
389            public void suppressError(
390                            String jobName, String groupName, StorageType storageType)
391                    throws SchedulerException {
392    
393                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
394                            storageType);
395    
396                    if (!memoryClusteredSlaveJob) {
397                            _readLock.lock();
398    
399                            try {
400                                    _schedulerEngine.suppressError(jobName, groupName, storageType);
401                            }
402                            finally {
403                                    _readLock.unlock();
404                            }
405                    }
406    
407                    setClusterableThreadLocal(storageType);
408            }
409    
410            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
411            @Override
412            public void unschedule(String groupName, StorageType storageType)
413                    throws SchedulerException {
414    
415                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
416                            storageType);
417    
418                    _readLock.lock();
419    
420                    try {
421                            if (memoryClusteredSlaveJob) {
422                                    removeMemoryClusteredJobs(groupName);
423                            }
424                            else {
425                                    _schedulerEngine.unschedule(groupName, storageType);
426                            }
427                    }
428                    finally {
429                            _readLock.unlock();
430                    }
431    
432                    setClusterableThreadLocal(storageType);
433            }
434    
435            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
436            @Override
437            public void unschedule(
438                            String jobName, String groupName, StorageType storageType)
439                    throws SchedulerException {
440    
441                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
442                            storageType);
443    
444                    _readLock.lock();
445    
446                    try {
447                            if (memoryClusteredSlaveJob) {
448                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
449                            }
450                            else {
451                                    _schedulerEngine.unschedule(jobName, groupName, storageType);
452                            }
453                    }
454                    finally {
455                            _readLock.unlock();
456                    }
457    
458                    setClusterableThreadLocal(storageType);
459            }
460    
461            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
462            @Override
463            public void update(Trigger trigger, StorageType storageType)
464                    throws SchedulerException {
465    
466                    String jobName = trigger.getJobName();
467                    String groupName = trigger.getGroupName();
468    
469                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(
470                            storageType);
471    
472                    _readLock.lock();
473    
474                    try {
475                            if (memoryClusteredSlaveJob) {
476                                    boolean updated = false;
477    
478                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
479                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
480    
481                                            SchedulerResponse schedulerResponse =
482                                                    memoryClusteredJob.getKey();
483    
484                                            if (jobName.equals(schedulerResponse.getJobName()) &&
485                                                    groupName.equals(schedulerResponse.getGroupName())) {
486    
487                                                    schedulerResponse.setTrigger(trigger);
488    
489                                                    updated = true;
490    
491                                                    break;
492                                            }
493                                    }
494    
495                                    if (!updated) {
496                                            throw new SchedulerException(
497                                                    "Unable to update trigger for memory clustered job");
498                                    }
499                            }
500                            else {
501                                    _schedulerEngine.update(trigger, storageType);
502                            }
503                    }
504                    finally {
505                            _readLock.unlock();
506                    }
507    
508                    setClusterableThreadLocal(storageType);
509            }
510    
511            protected String getFullName(String jobName, String groupName) {
512                    return groupName.concat(StringPool.PERIOD).concat(jobName);
513            }
514    
515            protected void initMemoryClusteredJobs() throws Exception {
516                    MethodHandler methodHandler = new MethodHandler(
517                            _getScheduledJobsMethodKey, StorageType.MEMORY_CLUSTERED);
518    
519                    Future<List<SchedulerResponse>> future =
520                            ClusterMasterExecutorUtil.executeOnMaster(methodHandler);
521    
522                    List<SchedulerResponse> schedulerResponses = future.get(
523                            PropsValues.CLUSTERABLE_ADVICE_CALL_MASTER_TIMEOUT,
524                            TimeUnit.SECONDS);
525    
526                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
527                            String jobName = schedulerResponse.getJobName();
528                            String groupName = schedulerResponse.getGroupName();
529    
530                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
531                                    schedulerResponse);
532    
533                            Message message = schedulerResponse.getMessage();
534    
535                            message.remove(JOB_STATE);
536    
537                            _memoryClusteredJobs.put(
538                                    getFullName(jobName, groupName),
539                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
540                                            schedulerResponse, triggerState));
541                    }
542            }
543    
544            protected boolean isMemoryClusteredSlaveJob(StorageType storageType) {
545                    if ((storageType != StorageType.MEMORY_CLUSTERED) ||
546                            ClusterMasterExecutorUtil.isMaster()) {
547    
548                            return false;
549                    }
550    
551                    return true;
552            }
553    
554            protected void removeMemoryClusteredJobs(String groupName) {
555                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
556                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
557    
558                    Iterator
559                            <Map.Entry<String,
560                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
561                                            memoryClusteredJobs.iterator();
562    
563                    while (itr.hasNext()) {
564                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
565                                    entry = itr.next();
566    
567                            ObjectValuePair<SchedulerResponse, TriggerState>
568                                    memoryClusteredJob = entry.getValue();
569    
570                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
571    
572                            if (groupName.equals(schedulerResponse.getGroupName())) {
573                                    itr.remove();
574                            }
575                    }
576            }
577    
578            protected void setClusterableThreadLocal(StorageType storageType) {
579                    ClusterableContextThreadLocal.putThreadLocalContext(
580                            STORAGE_TYPE, storageType);
581                    ClusterableContextThreadLocal.putThreadLocalContext(
582                            _PORTAL_READY, _portalReady);
583    
584                    boolean pluginReady = true;
585    
586                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
587                            PluginContextLifecycleThreadLocal.isDestroying()) {
588    
589                            pluginReady = false;
590                    }
591    
592                    ClusterableContextThreadLocal.putThreadLocalContext(
593                            _PLUGIN_READY, pluginReady);
594            }
595    
596            protected void updateMemoryClusteredJob(
597                    String jobName, String groupName, TriggerState triggerState) {
598    
599                    ObjectValuePair<SchedulerResponse, TriggerState>
600                            memoryClusteredJob = _memoryClusteredJobs.get(
601                                    getFullName(jobName, groupName));
602    
603                    if (memoryClusteredJob != null) {
604                            memoryClusteredJob.setValue(triggerState);
605                    }
606            }
607    
608            protected void updateMemoryClusteredJobs(
609                    String groupName, TriggerState triggerState) {
610    
611                    for (ObjectValuePair<SchedulerResponse, TriggerState>
612                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
613    
614                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
615    
616                            if (groupName.equals(schedulerResponse.getGroupName())) {
617                                    memoryClusteredJob.setValue(triggerState);
618                            }
619                    }
620            }
621    
622            @BeanReference(
623                    name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
624            protected SchedulerEngine schedulerEngine;
625    
626            private static final String _PLUGIN_READY = "plugin.ready";
627    
628            private static final String _PORTAL_READY = "portal.ready";
629    
630            private static final Log _log = LogFactoryUtil.getLog(
631                    ClusterSchedulerEngine.class);
632    
633            private static final MethodKey _getScheduledJobsMethodKey = new MethodKey(
634                    SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
635    
636            private String _beanIdentifier;
637            private final Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
638                    _memoryClusteredJobs = new ConcurrentHashMap
639                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
640            private boolean _portalReady;
641            private final java.util.concurrent.locks.Lock _readLock;
642            private ClusterMasterTokenTransitionListener
643                    _schedulerClusterMasterTokenTransitionListener;
644            private final SchedulerEngine _schedulerEngine;
645            private final java.util.concurrent.locks.Lock _writeLock;
646    
647            private static class SchedulerClusterInvokeAcceptor
648                    implements ClusterInvokeAcceptor {
649    
650                    @Override
651                    public boolean accept(Map<String, Serializable> context) {
652                            if (!ClusterInvokeThreadLocal.isEnabled()) {
653                                    return false;
654                            }
655    
656                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
657                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
658                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
659    
660                            if ((storageType == StorageType.PERSISTED) || !portalReady ||
661                                    !pluginReady) {
662    
663                                    return false;
664                            }
665    
666                            return true;
667                    }
668    
669            }
670    
671            private class SchedulerClusterMasterTokenTransitionListener
672                    extends BaseClusterMasterTokenTransitionListener {
673    
674                    @Override
675                    protected void doMasterTokenAcquired() throws Exception {
676                            boolean forceSync = ProxyModeThreadLocal.isForceSync();
677    
678                            ProxyModeThreadLocal.setForceSync(true);
679    
680                            _writeLock.lock();
681    
682                            try {
683                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
684                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
685    
686                                            SchedulerResponse schedulerResponse =
687                                                    memoryClusteredJob.getKey();
688    
689                                            _schedulerEngine.schedule(
690                                                    schedulerResponse.getTrigger(),
691                                                    schedulerResponse.getDescription(),
692                                                    schedulerResponse.getDestinationName(),
693                                                    schedulerResponse.getMessage(),
694                                                    schedulerResponse.getStorageType());
695    
696                                            TriggerState triggerState = memoryClusteredJob.getValue();
697    
698                                            if (triggerState.equals(TriggerState.PAUSED)) {
699                                                    _schedulerEngine.pause(
700                                                            schedulerResponse.getJobName(),
701                                                            schedulerResponse.getGroupName(),
702                                                            schedulerResponse.getStorageType());
703                                            }
704                                    }
705    
706                                    _memoryClusteredJobs.clear();
707    
708                                    if (_log.isInfoEnabled()) {
709                                            _log.info("MEMORY_CLUSTERED jobs are running on this node");
710                                    }
711                            }
712                            finally {
713                                    ProxyModeThreadLocal.setForceSync(forceSync);
714    
715                                    _writeLock.unlock();
716                            }
717                    }
718    
719                    @Override
720                    protected void doMasterTokenReleased() throws Exception {
721                            _writeLock.lock();
722    
723                            try {
724                                    for (SchedulerResponse schedulerResponse :
725                                                    _schedulerEngine.getScheduledJobs()) {
726    
727                                            if (StorageType.MEMORY_CLUSTERED ==
728                                                            schedulerResponse.getStorageType()) {
729    
730                                                    _schedulerEngine.delete(
731                                                            schedulerResponse.getJobName(),
732                                                            schedulerResponse.getGroupName(),
733                                                            schedulerResponse.getStorageType());
734                                            }
735                                    }
736    
737                                    initMemoryClusteredJobs();
738    
739                                    if (_log.isInfoEnabled()) {
740                                            _log.info(
741                                                    "MEMORY_CLUSTERED jobs stopped running on this node");
742                                    }
743                            }
744                            finally {
745                                    _writeLock.unlock();
746                            }
747                    }
748    
749            }
750    
751    }