001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.kernel.bean.BeanReference;
018    import com.liferay.portal.kernel.bean.IdentifiableBean;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterEvent;
021    import com.liferay.portal.kernel.cluster.ClusterEventListener;
022    import com.liferay.portal.kernel.cluster.ClusterEventType;
023    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.Clusterable;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.messaging.Message;
034    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037    import com.liferay.portal.kernel.scheduler.SchedulerException;
038    import com.liferay.portal.kernel.scheduler.StorageType;
039    import com.liferay.portal.kernel.scheduler.Trigger;
040    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041    import com.liferay.portal.kernel.scheduler.TriggerState;
042    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043    import com.liferay.portal.kernel.util.Base64;
044    import com.liferay.portal.kernel.util.CharPool;
045    import com.liferay.portal.kernel.util.MethodHandler;
046    import com.liferay.portal.kernel.util.MethodKey;
047    import com.liferay.portal.kernel.util.ObjectValuePair;
048    import com.liferay.portal.kernel.util.StringPool;
049    import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
050    import com.liferay.portal.model.Lock;
051    import com.liferay.portal.service.LockLocalServiceUtil;
052    import com.liferay.portal.util.PropsValues;
053    
054    import java.io.ObjectInputStream;
055    import java.io.ObjectOutputStream;
056    
057    import java.util.Iterator;
058    import java.util.List;
059    import java.util.Map;
060    import java.util.Set;
061    import java.util.concurrent.ConcurrentHashMap;
062    import java.util.concurrent.TimeUnit;
063    import java.util.concurrent.locks.ReadWriteLock;
064    import java.util.concurrent.locks.ReentrantReadWriteLock;
065    
066    /**
067     * @author Tina Tian
068     */
069    public class ClusterSchedulerEngine
070            implements IdentifiableBean, SchedulerEngine,
071                               SchedulerEngineClusterManager {
072    
073            public static SchedulerEngine createClusterSchedulerEngine(
074                    SchedulerEngine schedulerEngine) {
075    
076                    if (PropsValues.CLUSTER_LINK_ENABLED) {
077                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
078                    }
079    
080                    return schedulerEngine;
081            }
082    
083            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
084                    _schedulerEngine = schedulerEngine;
085            }
086    
087            @Clusterable
088            public void delete(String groupName) throws SchedulerException {
089                    try {
090                            if (isMemorySchedulerSlave(groupName)) {
091                                    removeMemoryClusteredJobs(groupName);
092    
093                                    return;
094                            }
095                    }
096                    catch (Exception e) {
097                            throw new SchedulerException(
098                                    "Unable to delete jobs in group " + groupName, e);
099                    }
100    
101                    _readLock.lock();
102    
103                    try {
104                            _schedulerEngine.delete(groupName);
105                    }
106                    finally {
107                            _readLock.unlock();
108                    }
109    
110                    skipClusterInvoking(groupName);
111            }
112    
113            @Clusterable
114            public void delete(String jobName, String groupName)
115                    throws SchedulerException {
116    
117                    try {
118                            if (isMemorySchedulerSlave(groupName)) {
119                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
120    
121                                    return;
122                            }
123                    }
124                    catch (Exception e) {
125                            throw new SchedulerException(
126                                    "Unable to delete job {jobName=" + jobName + ", groupName=" +
127                                            groupName + "}",
128                                    e);
129                    }
130    
131                    _readLock.lock();
132    
133                    try {
134                            _schedulerEngine.delete(jobName, groupName);
135                    }
136                    finally {
137                            _readLock.unlock();
138                    }
139    
140                    skipClusterInvoking(groupName);
141            }
142    
143            public String getBeanIdentifier() {
144                    return _beanIdentifier;
145            }
146    
147            public SchedulerResponse getScheduledJob(String jobName, String groupName)
148                    throws SchedulerException {
149    
150                    try {
151                            if (isMemorySchedulerSlave(groupName)) {
152                                    return (SchedulerResponse)callMaster(
153                                            _getScheduledJobMethodKey, jobName, groupName);
154                            }
155                    }
156                    catch (Exception e) {
157                            throw new SchedulerException(
158                                    "Unable to get job {jobName=" + jobName + ", groupName=" +
159                                            groupName + "}",
160                                    e);
161                    }
162    
163                    _readLock.lock();
164    
165                    try {
166                            return _schedulerEngine.getScheduledJob(jobName, groupName);
167                    }
168                    finally {
169                            _readLock.unlock();
170                    }
171            }
172    
173            public List<SchedulerResponse> getScheduledJobs()
174                    throws SchedulerException {
175    
176                    try {
177                            if (isMemorySchedulerSlave()) {
178                                    return (List<SchedulerResponse>)callMaster(
179                                            _getScheduledJobsMethodKey1);
180                            }
181                    }
182                    catch (Exception e) {
183                            throw new SchedulerException("Unable to get jobs", e);
184                    }
185    
186                    _readLock.lock();
187    
188                    try {
189                            return _schedulerEngine.getScheduledJobs();
190                    }
191                    finally {
192                            _readLock.unlock();
193                    }
194            }
195    
196            public List<SchedulerResponse> getScheduledJobs(String groupName)
197                    throws SchedulerException {
198    
199                    try {
200                            if (isMemorySchedulerSlave(groupName)) {
201                                    return (List<SchedulerResponse>)callMaster(
202                                            _getScheduledJobsMethodKey2, groupName);
203                            }
204                    }
205                    catch (Exception e) {
206                            throw new SchedulerException(
207                                    "Unable to get jobs in group " + groupName, e);
208                    }
209    
210                    _readLock.lock();
211    
212                    try {
213                            return _schedulerEngine.getScheduledJobs(groupName);
214                    }
215                    finally {
216                            _readLock.unlock();
217                    }
218            }
219    
220            @Clusterable
221            public void pause(String groupName) throws SchedulerException {
222                    try {
223                            if (isMemorySchedulerSlave(groupName)) {
224                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
225    
226                                    return;
227                            }
228                    }
229                    catch (Exception e) {
230                            throw new SchedulerException(
231                                    "Unable to pause jobs in group " + groupName, e);
232                    }
233    
234                    _readLock.lock();
235    
236                    try {
237                            _schedulerEngine.pause(groupName);
238                    }
239                    finally {
240                            _readLock.unlock();
241                    }
242    
243                    skipClusterInvoking(groupName);
244            }
245    
246            @Clusterable
247            public void pause(String jobName, String groupName)
248                    throws SchedulerException {
249    
250                    try {
251                            if (isMemorySchedulerSlave(groupName)) {
252                                    updateMemoryClusteredJob(
253                                            jobName, groupName, TriggerState.PAUSED);
254    
255                                    return;
256                            }
257                    }
258                    catch (Exception e) {
259                            throw new SchedulerException(
260                                    "Unable to pause job {jobName=" + jobName + ", groupName=" +
261                                            groupName + "}",
262                                    e);
263                    }
264    
265                    _readLock.lock();
266    
267                    try {
268                            _schedulerEngine.pause(jobName, groupName);
269                    }
270                    finally {
271                            _readLock.unlock();
272                    }
273    
274                    skipClusterInvoking(groupName);
275            }
276    
277            @Clusterable
278            public void resume(String groupName) throws SchedulerException {
279                    try {
280                            if (isMemorySchedulerSlave(groupName)) {
281                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
282    
283                                    return;
284                            }
285                    }
286                    catch (Exception e) {
287                            throw new SchedulerException(
288                                    "Unable to resume jobs in group " + groupName, e);
289                    }
290    
291                    _readLock.lock();
292    
293                    try {
294                            _schedulerEngine.resume(groupName);
295                    }
296                    finally {
297                            _readLock.unlock();
298                    }
299    
300                    skipClusterInvoking(groupName);
301            }
302    
303            @Clusterable
304            public void resume(String jobName, String groupName)
305                    throws SchedulerException {
306    
307                    try {
308                            if (isMemorySchedulerSlave(groupName)) {
309                                    updateMemoryClusteredJob(
310                                            jobName, groupName, TriggerState.NORMAL);
311    
312                                    return;
313                            }
314                    }
315                    catch (Exception e) {
316                            throw new SchedulerException(
317                                    "Unable to resume job {jobName=" + jobName + ", groupName=" +
318                                            groupName + "}",
319                                    e);
320                    }
321    
322                    _readLock.lock();
323    
324                    try {
325                            _schedulerEngine.resume(jobName, groupName);
326                    }
327                    finally {
328                            _readLock.unlock();
329                    }
330    
331                    skipClusterInvoking(groupName);
332            }
333    
334            @Clusterable
335            public void schedule(
336                            Trigger trigger, String description, String destinationName,
337                            Message message)
338                    throws SchedulerException {
339    
340                    String groupName = trigger.getGroupName();
341                    String jobName = trigger.getJobName();
342    
343                    try {
344                            if (isMemorySchedulerSlave(groupName)) {
345                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
346    
347                                    schedulerResponse.setDescription(description);
348                                    schedulerResponse.setDestinationName(destinationName);
349                                    schedulerResponse.setGroupName(groupName);
350                                    schedulerResponse.setJobName(jobName);
351                                    schedulerResponse.setMessage(message);
352                                    schedulerResponse.setTrigger(trigger);
353    
354                                    _memoryClusteredJobs.put(
355                                            getFullName(jobName, groupName),
356                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
357                                                    schedulerResponse, TriggerState.NORMAL));
358    
359                                    return;
360                            }
361                    }
362                    catch (Exception e) {
363                            throw new SchedulerException(
364                                    "Unable to schedule job {jobName=" + jobName + ", groupName=" +
365                                            groupName + "}",
366                                    e);
367                    }
368    
369                    _readLock.lock();
370    
371                    try {
372                            _schedulerEngine.schedule(
373                                    trigger, description, destinationName, message);
374                    }
375                    finally {
376                            _readLock.unlock();
377                    }
378    
379                    skipClusterInvoking(groupName);
380            }
381    
382            public void setBeanIdentifier(String beanIdentifier) {
383                    _beanIdentifier = beanIdentifier;
384            }
385    
386            public void shutdown() throws SchedulerException {
387                    try {
388                            ClusterExecutorUtil.removeClusterEventListener(
389                                    _clusterEventListener);
390    
391                            LockLocalServiceUtil.unlock(
392                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
393                                    PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
394                    }
395                    catch (Exception e) {
396                            throw new SchedulerException("Unable to shutdown scheduler", e);
397                    }
398    
399                    _schedulerEngine.shutdown();
400            }
401    
402            public void start() throws SchedulerException {
403                    try {
404                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
405    
406                            _readLock = readWriteLock.readLock();
407                            _writeLock = readWriteLock.writeLock();
408    
409                            _localClusterNodeAddress = getSerializedString(
410                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
411    
412                            _clusterEventListener = new MemorySchedulerClusterEventListener();
413    
414                            ClusterExecutorUtil.addClusterEventListener(
415                                    _clusterEventListener);
416    
417                            if (!isMemorySchedulerClusterLockOwner(
418                                            lockMemorySchedulerCluster(null))) {
419    
420                                    initMemoryClusteredJobs();
421                            }
422                    }
423                    catch (Exception e) {
424                            throw new SchedulerException("Unable to start scheduler", e);
425                    }
426    
427                    _schedulerEngine.start();
428            }
429    
430            @Clusterable
431            public void suppressError(String jobName, String groupName)
432                    throws SchedulerException {
433    
434                    try {
435                            if (isMemorySchedulerSlave(groupName)) {
436                                    return;
437                            }
438                    }
439                    catch (Exception e) {
440                            throw new SchedulerException(
441                                    "Unable to suppress error for job {jobName=" + jobName +
442                                            ", groupName=" + groupName + "}",
443                                    e);
444                    }
445    
446                    _readLock.lock();
447    
448                    try {
449                            _schedulerEngine.suppressError(jobName, groupName);
450                    }
451                    finally {
452                            _readLock.unlock();
453                    }
454    
455                    skipClusterInvoking(groupName);
456            }
457    
458            @Clusterable
459            public void unschedule(String groupName) throws SchedulerException {
460                    try {
461                            if (isMemorySchedulerSlave(groupName)) {
462                                    removeMemoryClusteredJobs(groupName);
463    
464                                    return;
465                            }
466                    }
467                    catch (Exception e) {
468                            throw new SchedulerException(
469                                    "Unable to unschedule jobs in group " + groupName, e);
470                    }
471    
472                    _readLock.lock();
473    
474                    try {
475                            _schedulerEngine.unschedule(groupName);
476                    }
477                    finally {
478                            _readLock.unlock();
479                    }
480    
481                    skipClusterInvoking(groupName);
482            }
483    
484            @Clusterable
485            public void unschedule(String jobName, String groupName)
486                    throws SchedulerException {
487    
488                    try {
489                            if (isMemorySchedulerSlave(groupName)) {
490                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
491    
492                                    return;
493                            }
494                    }
495                    catch (Exception e) {
496                            throw new SchedulerException(
497                                    "Unable to unschedule job {jobName=" + jobName +
498                                            ", groupName=" + groupName + "}",
499                                    e);
500                    }
501    
502                    _readLock.lock();
503    
504                    try {
505                            _schedulerEngine.unschedule(jobName, groupName);
506                    }
507                    finally {
508                            _readLock.unlock();
509                    }
510    
511                    skipClusterInvoking(groupName);
512            }
513    
514            @Clusterable
515            public void update(Trigger trigger) throws SchedulerException {
516                    String jobName = trigger.getJobName();
517                    String groupName = trigger.getGroupName();
518    
519                    try {
520                            if (isMemorySchedulerSlave(groupName)) {
521                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
522                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
523    
524                                            SchedulerResponse schedulerResponse =
525                                                    memoryClusteredJob.getKey();
526    
527                                            if (jobName.equals(schedulerResponse.getJobName()) &&
528                                                    groupName.equals(schedulerResponse.getGroupName())) {
529    
530                                                    schedulerResponse.setTrigger(trigger);
531    
532                                                    return;
533                                            }
534                                    }
535    
536                                    throw new Exception(
537                                            "Unable to update trigger for memory clustered job");
538                            }
539                    }
540                    catch (Exception e) {
541                            throw new SchedulerException(
542                                    "Unable to update job {jobName=" + jobName + ", groupName=" +
543                                            groupName + "}",
544                                    e);
545                    }
546    
547                    _readLock.lock();
548    
549                    try {
550                            _schedulerEngine.update(trigger);
551                    }
552                    finally {
553                            _readLock.unlock();
554                    }
555    
556                    skipClusterInvoking(groupName);
557            }
558    
559            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
560                    try {
561                            Lock lock = lockMemorySchedulerCluster(null);
562    
563                            Address address = (Address)getDeserializedObject(lock.getOwner());
564    
565                            if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
566                                    return lock;
567                            }
568    
569                            return lockMemorySchedulerCluster(lock.getOwner());
570                    }
571                    catch (Exception e) {
572                            throw new SchedulerException(
573                                    "Unable to update memory scheduler cluster master", e);
574                    }
575            }
576    
577            protected Object callMaster(MethodKey methodKey, Object... arguments)
578                    throws Exception {
579    
580                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
581    
582                    Lock lock = updateMemorySchedulerClusterMaster();
583    
584                    Address address = (Address)getDeserializedObject(lock.getOwner());
585    
586                    if (address.equals(ClusterExecutorUtil.getLocalClusterNodeAddress())) {
587                            if (methodKey == _getScheduledJobsMethodKey3) {
588                                    return methodHandler.invoke(false);
589                            }
590                            else {
591                                    return methodHandler.invoke(schedulerEngine);
592                            }
593                    }
594    
595                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
596                            methodHandler, address);
597    
598                    clusterRequest.setBeanIdentifier(_beanIdentifier);
599    
600                    FutureClusterResponses futureClusterResponses =
601                            ClusterExecutorUtil.execute(clusterRequest);
602    
603                    try {
604                            ClusterNodeResponses clusterNodeResponses =
605                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
606    
607                            ClusterNodeResponse clusterNodeResponse =
608                                    clusterNodeResponses.getClusterResponse(address);
609    
610                            return clusterNodeResponse.getResult();
611                    }
612                    catch (Exception e) {
613                            throw new SchedulerException(
614                                    "Unable to load scheduled jobs from cluster node " +
615                                            address.getDescription(),
616                                    e);
617                    }
618            }
619    
620            protected Object getDeserializedObject(String string) throws Exception {
621                    byte[] bytes = Base64.decode(string);
622    
623                    UnsyncByteArrayInputStream byteArrayInputStream =
624                            new UnsyncByteArrayInputStream(bytes);
625    
626                    ObjectInputStream objectInputStream = new ObjectInputStream(
627                            byteArrayInputStream);
628    
629                    Object object = objectInputStream.readObject();
630    
631                    objectInputStream.close();
632    
633                    return object;
634            }
635    
636            protected String getFullName(String jobName, String groupName) {
637                    return groupName.concat(StringPool.PERIOD).concat(jobName);
638            }
639    
640            protected String getSerializedString(Object object) throws Exception {
641                    UnsyncByteArrayOutputStream byteArrayOutputStream =
642                            new UnsyncByteArrayOutputStream();
643    
644                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
645                            byteArrayOutputStream);
646    
647                    objectOutputStream.writeObject(object);
648                    objectOutputStream.close();
649    
650                    byte[] bytes = byteArrayOutputStream.toByteArray();
651    
652                    return Base64.encode(bytes);
653            }
654    
655            protected StorageType getStorageType(String groupName) {
656                    int pos = groupName.indexOf(CharPool.POUND);
657    
658                    String storageTypeString = groupName.substring(0, pos);
659    
660                    return StorageType.valueOf(storageTypeString);
661            }
662    
663            protected void initMemoryClusteredJobs() throws Exception {
664                    List<SchedulerResponse> schedulerResponses =
665                            (List<SchedulerResponse>)callMaster(
666                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
667    
668                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
669                            Trigger oldTrigger = schedulerResponse.getTrigger();
670    
671                            String jobName = schedulerResponse.getJobName();
672                            String groupName = SchedulerEngineUtil.namespaceGroupName(
673                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
674    
675                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
676                                    oldTrigger.getTriggerType(), jobName, groupName,
677                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
678                                    oldTrigger.getTriggerContent());
679    
680                            schedulerResponse.setTrigger(newTrigger);
681    
682                            TriggerState triggerState = SchedulerEngineUtil.getJobState(
683                                    schedulerResponse);
684    
685                            Message message = schedulerResponse.getMessage();
686    
687                            message.remove(JOB_STATE);
688    
689                            _memoryClusteredJobs.put(
690                                    getFullName(jobName, groupName),
691                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
692                                            schedulerResponse, triggerState));
693                    }
694            }
695    
696            protected boolean isMemorySchedulerClusterLockOwner(Lock lock)
697                    throws Exception {
698    
699                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
700    
701                    if (master == _master) {
702                            return master;
703                    }
704    
705                    if (!_master) {
706                            _master = master;
707    
708                            return _master;
709                    }
710    
711                    _localClusterNodeAddress = getSerializedString(
712                            ClusterExecutorUtil.getLocalClusterNodeAddress());
713    
714                    for (ObjectValuePair<SchedulerResponse, TriggerState>
715                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
716    
717                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
718    
719                            _schedulerEngine.delete(
720                                    schedulerResponse.getJobName(),
721                                    schedulerResponse.getGroupName());
722                    }
723    
724                    initMemoryClusteredJobs();
725    
726                    if (_log.isInfoEnabled()) {
727                            _log.info("Another node is now the memory scheduler master");
728                    }
729    
730                    _master = master;
731    
732                    return master;
733            }
734    
735            protected boolean isMemorySchedulerSlave() throws Exception {
736                    return isMemorySchedulerSlave(null);
737            }
738    
739            protected boolean isMemorySchedulerSlave(String groupName)
740                    throws Exception {
741    
742                    if (groupName != null) {
743                            StorageType storageType = getStorageType(groupName);
744    
745                            if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
746                                    return false;
747                            }
748                    }
749    
750                    Lock lock = lockMemorySchedulerCluster(null);
751    
752                    if (isMemorySchedulerClusterLockOwner(lock)) {
753                            return false;
754                    }
755    
756                    return true;
757            }
758    
759            protected Lock lockMemorySchedulerCluster(String owner) throws Exception {
760                    Lock lock = null;
761    
762                    while (true) {
763                            try {
764                                    if (owner == null) {
765                                            lock = LockLocalServiceUtil.lock(
766                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
767                                                    _localClusterNodeAddress,
768                                                    PropsValues.
769                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
770                                    }
771                                    else {
772                                            lock = LockLocalServiceUtil.lock(
773                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
774                                                    _localClusterNodeAddress,
775                                                    PropsValues.
776                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
777                                    }
778    
779                                    break;
780                            }
781                            catch (Exception e) {
782                                    if (_log.isWarnEnabled()) {
783                                            _log.warn(
784                                                    "Unable to obtain memory scheduler cluster lock. " +
785                                                            "Trying again.");
786                                    }
787                            }
788                    }
789    
790                    if (!lock.isNew()) {
791                            return lock;
792                    }
793    
794                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
795    
796                    ProxyModeThreadLocal.setForceSync(true);
797    
798                    _writeLock.lock();
799    
800                    try {
801                            for (ObjectValuePair<SchedulerResponse, TriggerState>
802                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
803    
804                                    SchedulerResponse schedulerResponse =
805                                            memoryClusteredJob.getKey();
806    
807                                    _schedulerEngine.schedule(
808                                            schedulerResponse.getTrigger(),
809                                            schedulerResponse.getDescription(),
810                                            schedulerResponse.getDestinationName(),
811                                            schedulerResponse.getMessage());
812    
813                                    TriggerState triggerState = memoryClusteredJob.getValue();
814    
815                                    if (triggerState.equals(TriggerState.PAUSED)) {
816                                            _schedulerEngine.pause(
817                                                    schedulerResponse.getJobName(),
818                                                    schedulerResponse.getGroupName());
819                                    }
820                            }
821                    }
822                    finally {
823                            ProxyModeThreadLocal.setForceSync(forceSync);
824    
825                            _writeLock.unlock();
826                    }
827    
828                    return lock;
829            }
830    
831            protected void removeMemoryClusteredJobs(String groupName) {
832                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
833                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
834    
835                    Iterator
836                            <Map.Entry<String,
837                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
838                                            memoryClusteredJobs.iterator();
839    
840                    while (itr.hasNext()) {
841                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
842                                    entry = itr.next();
843    
844                            ObjectValuePair<SchedulerResponse, TriggerState>
845                                    memoryClusteredJob = entry.getValue();
846    
847                            SchedulerResponse schedulerResponse =
848                                    memoryClusteredJob.getKey();
849    
850                            if (groupName.equals(schedulerResponse.getGroupName())) {
851                                    itr.remove();
852                            }
853                    }
854            }
855    
856            protected void skipClusterInvoking(String groupName)
857                    throws SchedulerException {
858    
859                    StorageType storageType = getStorageType(groupName);
860    
861                    if (storageType.equals(StorageType.PERSISTED)) {
862                            SchedulerException schedulerException = new SchedulerException();
863    
864                            schedulerException.setSwallowable(true);
865    
866                            throw schedulerException;
867                    }
868            }
869    
870            protected void updateMemoryClusteredJob(
871                    String jobName, String groupName, TriggerState triggerState) {
872    
873                    ObjectValuePair<SchedulerResponse, TriggerState>
874                            memoryClusteredJob = _memoryClusteredJobs.get(
875                                    getFullName(jobName, groupName));
876    
877                    if (memoryClusteredJob != null) {
878                            memoryClusteredJob.setValue(triggerState);
879                    }
880            }
881    
882            protected void updateMemoryClusteredJobs(
883                    String groupName, TriggerState triggerState) {
884    
885                    for (ObjectValuePair<SchedulerResponse, TriggerState>
886                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
887    
888                            SchedulerResponse schedulerResponse =
889                                    memoryClusteredJob.getKey();
890    
891                            if (groupName.equals(schedulerResponse.getGroupName())) {
892                                    memoryClusteredJob.setValue(triggerState);
893                            }
894                    }
895            }
896    
897            @BeanReference(
898                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
899            protected SchedulerEngine schedulerEngine;
900    
901            private static final String _LOCK_CLASS_NAME =
902                    SchedulerEngine.class.getName();
903    
904            private static Log _log = LogFactoryUtil.getLog(
905                    ClusterSchedulerEngine.class);
906    
907            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
908                    SchedulerEngine.class.getName(), "getScheduledJob", String.class,
909                    String.class);
910            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
911                    SchedulerEngine.class.getName(), "getScheduledJobs");
912            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
913                    SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
914            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
915                    SchedulerEngineUtil.class.getName(), "getScheduledJobs",
916                    StorageType.class);
917    
918            private String _beanIdentifier;
919            private ClusterEventListener _clusterEventListener;
920            private volatile String _localClusterNodeAddress;
921            private volatile boolean _master;
922            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
923                    _memoryClusteredJobs = new ConcurrentHashMap
924                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
925            private java.util.concurrent.locks.Lock _readLock;
926            private SchedulerEngine _schedulerEngine;
927            private java.util.concurrent.locks.Lock _writeLock;
928    
929            private class MemorySchedulerClusterEventListener
930                    implements ClusterEventListener {
931    
932                    public void processClusterEvent(ClusterEvent clusterEvent) {
933                            ClusterEventType clusterEventType =
934                                    clusterEvent.getClusterEventType();
935    
936                            if (!clusterEventType.equals(ClusterEventType.DEPART)) {
937                                    return;
938                            }
939    
940                            try {
941                                    updateMemorySchedulerClusterMaster();
942                            }
943                            catch (Exception e) {
944                                    _log.error("Unable to update memory scheduler cluster lock", e);
945                            }
946                    }
947    
948            }
949    
950    }