001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.kernel.bean.BeanReference;
018    import com.liferay.portal.kernel.bean.IdentifiableBean;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterEvent;
021    import com.liferay.portal.kernel.cluster.ClusterEventListener;
022    import com.liferay.portal.kernel.cluster.ClusterEventType;
023    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.Clusterable;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.messaging.Message;
034    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037    import com.liferay.portal.kernel.scheduler.SchedulerException;
038    import com.liferay.portal.kernel.scheduler.StorageType;
039    import com.liferay.portal.kernel.scheduler.Trigger;
040    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041    import com.liferay.portal.kernel.scheduler.TriggerState;
042    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043    import com.liferay.portal.kernel.util.Base64;
044    import com.liferay.portal.kernel.util.CharPool;
045    import com.liferay.portal.kernel.util.MethodHandler;
046    import com.liferay.portal.kernel.util.MethodKey;
047    import com.liferay.portal.kernel.util.ObjectValuePair;
048    import com.liferay.portal.kernel.util.StringPool;
049    import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
050    import com.liferay.portal.model.Lock;
051    import com.liferay.portal.service.LockLocalServiceUtil;
052    import com.liferay.portal.util.PropsValues;
053    
054    import java.io.ObjectInputStream;
055    import java.io.ObjectOutputStream;
056    
057    import java.util.Collections;
058    import java.util.Iterator;
059    import java.util.List;
060    import java.util.Map;
061    import java.util.Set;
062    import java.util.concurrent.ConcurrentHashMap;
063    import java.util.concurrent.TimeUnit;
064    import java.util.concurrent.locks.ReadWriteLock;
065    import java.util.concurrent.locks.ReentrantReadWriteLock;
066    
067    /**
068     * @author Tina Tian
069     */
070    public class ClusterSchedulerEngine
071            implements IdentifiableBean, SchedulerEngine,
072                               SchedulerEngineClusterManager {
073    
074            public static SchedulerEngine createClusterSchedulerEngine(
075                    SchedulerEngine schedulerEngine) {
076    
077                    if (PropsValues.CLUSTER_LINK_ENABLED) {
078                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
079                    }
080    
081                    return schedulerEngine;
082            }
083    
084            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
085                    _schedulerEngine = schedulerEngine;
086            }
087    
088            @Clusterable
089            public void delete(String groupName) throws SchedulerException {
090                    if (!PropsValues.SCHEDULER_ENABLED) {
091                            return;
092                    }
093    
094                    try {
095                            if (isMemorySchedulerSlave(groupName)) {
096                                    removeMemoryClusteredJobs(groupName);
097    
098                                    return;
099                            }
100                    }
101                    catch (Exception e) {
102                            throw new SchedulerException(
103                                    "Unable to delete jobs in group " + groupName, e);
104                    }
105    
106                    _readLock.lock();
107    
108                    try {
109                            _schedulerEngine.delete(groupName);
110                    }
111                    finally {
112                            _readLock.unlock();
113                    }
114    
115                    skipClusterInvoking(groupName);
116            }
117    
118            @Clusterable
119            public void delete(String jobName, String groupName)
120                    throws SchedulerException {
121    
122                    if (!PropsValues.SCHEDULER_ENABLED) {
123                            return;
124                    }
125    
126                    try {
127                            if (isMemorySchedulerSlave(groupName)) {
128                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
129    
130                                    return;
131                            }
132                    }
133                    catch (Exception e) {
134                            throw new SchedulerException(
135                                    "Unable to delete job {jobName=" + jobName + ", groupName=" +
136                                            groupName + "}",
137                                    e);
138                    }
139    
140                    _readLock.lock();
141    
142                    try {
143                            _schedulerEngine.delete(jobName, groupName);
144                    }
145                    finally {
146                            _readLock.unlock();
147                    }
148    
149                    skipClusterInvoking(groupName);
150            }
151    
152            public String getBeanIdentifier() {
153                    return _beanIdentifier;
154            }
155    
156            public SchedulerResponse getScheduledJob(String jobName, String groupName)
157                    throws SchedulerException {
158    
159                    if (!PropsValues.SCHEDULER_ENABLED) {
160                            return null;
161                    }
162    
163                    try {
164                            if (isMemorySchedulerSlave(groupName)) {
165                                    return (SchedulerResponse)callMaster(
166                                            _getScheduledJobMethodKey, jobName, groupName);
167                            }
168                    }
169                    catch (Exception e) {
170                            throw new SchedulerException(
171                                    "Unable to get job {jobName=" + jobName + ", groupName=" +
172                                            groupName + "}",
173                                    e);
174                    }
175    
176                    _readLock.lock();
177    
178                    try {
179                            return _schedulerEngine.getScheduledJob(jobName, groupName);
180                    }
181                    finally {
182                            _readLock.unlock();
183                    }
184            }
185    
186            public List<SchedulerResponse> getScheduledJobs()
187                    throws SchedulerException {
188    
189                    if (!PropsValues.SCHEDULER_ENABLED) {
190                            return Collections.emptyList();
191                    }
192    
193                    try {
194                            if (isMemorySchedulerSlave()) {
195                                    return (List<SchedulerResponse>)callMaster(
196                                            _getScheduledJobsMethodKey1);
197                            }
198                    }
199                    catch (Exception e) {
200                            throw new SchedulerException("Unable to get jobs", e);
201                    }
202    
203                    _readLock.lock();
204    
205                    try {
206                            return _schedulerEngine.getScheduledJobs();
207                    }
208                    finally {
209                            _readLock.unlock();
210                    }
211            }
212    
213            public List<SchedulerResponse> getScheduledJobs(String groupName)
214                    throws SchedulerException {
215    
216                    if (!PropsValues.SCHEDULER_ENABLED) {
217                            return Collections.emptyList();
218                    }
219    
220                    try {
221                            if (isMemorySchedulerSlave(groupName)) {
222                                    return (List<SchedulerResponse>)callMaster(
223                                            _getScheduledJobsMethodKey2, groupName);
224                            }
225                    }
226                    catch (Exception e) {
227                            throw new SchedulerException(
228                                    "Unable to get jobs in group " + groupName, e);
229                    }
230    
231                    _readLock.lock();
232    
233                    try {
234                            return _schedulerEngine.getScheduledJobs(groupName);
235                    }
236                    finally {
237                            _readLock.unlock();
238                    }
239            }
240    
241            @Clusterable
242            public void pause(String groupName) throws SchedulerException {
243                    if (!PropsValues.SCHEDULER_ENABLED) {
244                            return;
245                    }
246    
247                    try {
248                            if (isMemorySchedulerSlave(groupName)) {
249                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
250    
251                                    return;
252                            }
253                    }
254                    catch (Exception e) {
255                            throw new SchedulerException(
256                                    "Unable to pause jobs in group " + groupName, e);
257                    }
258    
259                    _readLock.lock();
260    
261                    try {
262                            _schedulerEngine.pause(groupName);
263                    }
264                    finally {
265                            _readLock.unlock();
266                    }
267    
268                    skipClusterInvoking(groupName);
269            }
270    
271            @Clusterable
272            public void pause(String jobName, String groupName)
273                    throws SchedulerException {
274    
275                    if (!PropsValues.SCHEDULER_ENABLED) {
276                            return;
277                    }
278    
279                    try {
280                            if (isMemorySchedulerSlave(groupName)) {
281                                    updateMemoryClusteredJob(
282                                            jobName, groupName, TriggerState.PAUSED);
283    
284                                    return;
285                            }
286                    }
287                    catch (Exception e) {
288                            throw new SchedulerException(
289                                    "Unable to pause job {jobName=" + jobName + ", groupName=" +
290                                            groupName + "}",
291                                    e);
292                    }
293    
294                    _readLock.lock();
295    
296                    try {
297                            _schedulerEngine.pause(jobName, groupName);
298                    }
299                    finally {
300                            _readLock.unlock();
301                    }
302    
303                    skipClusterInvoking(groupName);
304            }
305    
306            @Clusterable
307            public void resume(String groupName) throws SchedulerException {
308                    if (!PropsValues.SCHEDULER_ENABLED) {
309                            return;
310                    }
311    
312                    try {
313                            if (isMemorySchedulerSlave(groupName)) {
314                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
315    
316                                    return;
317                            }
318                    }
319                    catch (Exception e) {
320                            throw new SchedulerException(
321                                    "Unable to resume jobs in group " + groupName, e);
322                    }
323    
324                    _readLock.lock();
325    
326                    try {
327                            _schedulerEngine.resume(groupName);
328                    }
329                    finally {
330                            _readLock.unlock();
331                    }
332    
333                    skipClusterInvoking(groupName);
334            }
335    
336            @Clusterable
337            public void resume(String jobName, String groupName)
338                    throws SchedulerException {
339    
340                    if (!PropsValues.SCHEDULER_ENABLED) {
341                            return;
342                    }
343    
344                    try {
345                            if (isMemorySchedulerSlave(groupName)) {
346                                    updateMemoryClusteredJob(
347                                            jobName, groupName, TriggerState.NORMAL);
348    
349                                    return;
350                            }
351                    }
352                    catch (Exception e) {
353                            throw new SchedulerException(
354                                    "Unable to resume job {jobName=" + jobName + ", groupName=" +
355                                            groupName + "}",
356                                    e);
357                    }
358    
359                    _readLock.lock();
360    
361                    try {
362                            _schedulerEngine.resume(jobName, groupName);
363                    }
364                    finally {
365                            _readLock.unlock();
366                    }
367    
368                    skipClusterInvoking(groupName);
369            }
370    
371            @Clusterable
372            public void schedule(
373                            Trigger trigger, String description, String destinationName,
374                            Message message)
375                    throws SchedulerException {
376    
377                    if (!PropsValues.SCHEDULER_ENABLED) {
378                            return;
379                    }
380    
381                    String groupName = trigger.getGroupName();
382                    String jobName = trigger.getJobName();
383    
384                    try {
385                            if (isMemorySchedulerSlave(groupName)) {
386                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
387    
388                                    schedulerResponse.setDescription(description);
389                                    schedulerResponse.setDestinationName(destinationName);
390                                    schedulerResponse.setGroupName(groupName);
391                                    schedulerResponse.setJobName(jobName);
392                                    schedulerResponse.setMessage(message);
393                                    schedulerResponse.setTrigger(trigger);
394    
395                                    _memoryClusteredJobs.put(
396                                            getFullName(jobName, groupName),
397                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
398                                                    schedulerResponse, TriggerState.NORMAL));
399    
400                                    return;
401                            }
402                    }
403                    catch (Exception e) {
404                            throw new SchedulerException(
405                                    "Unable to schedule job {jobName=" + jobName + ", groupName=" +
406                                            groupName + "}",
407                                    e);
408                    }
409    
410                    _readLock.lock();
411    
412                    try {
413                            _schedulerEngine.schedule(
414                                    trigger, description, destinationName, message);
415                    }
416                    finally {
417                            _readLock.unlock();
418                    }
419    
420                    skipClusterInvoking(groupName);
421            }
422    
423            public void setBeanIdentifier(String beanIdentifier) {
424                    _beanIdentifier = beanIdentifier;
425            }
426    
427            public void shutdown() throws SchedulerException {
428                    if (!PropsValues.SCHEDULER_ENABLED) {
429                            return;
430                    }
431    
432                    try {
433                            ClusterExecutorUtil.removeClusterEventListener(
434                                    _clusterEventListener);
435    
436                            LockLocalServiceUtil.unlock(
437                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
438                                    PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
439                    }
440                    catch (Exception e) {
441                            throw new SchedulerException("Unable to shutdown scheduler", e);
442                    }
443    
444                    _schedulerEngine.shutdown();
445            }
446    
447            public void start() throws SchedulerException {
448                    if (!PropsValues.SCHEDULER_ENABLED) {
449                            return;
450                    }
451    
452                    try {
453                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
454    
455                            _readLock = readWriteLock.readLock();
456                            _writeLock = readWriteLock.writeLock();
457    
458                            _localClusterNodeAddress = getSerializedString(
459                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
460    
461                            _clusterEventListener = new MemorySchedulerClusterEventListener();
462    
463                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
464    
465                            if (!isMemorySchedulerClusterLockOwner(
466                                            lockMemorySchedulerCluster(null))) {
467    
468                                    initMemoryClusteredJobs();
469                            }
470                    }
471                    catch (Exception e) {
472                            throw new SchedulerException("Unable to start scheduler", e);
473                    }
474    
475                    _schedulerEngine.start();
476            }
477    
478            @Clusterable
479            public void suppressError(String jobName, String groupName)
480                    throws SchedulerException {
481    
482                    if (!PropsValues.SCHEDULER_ENABLED) {
483                            return;
484                    }
485    
486                    try {
487                            if (isMemorySchedulerSlave(groupName)) {
488                                    return;
489                            }
490                    }
491                    catch (Exception e) {
492                            throw new SchedulerException(
493                                    "Unable to suppress error for job {jobName=" + jobName +
494                                            ", groupName=" + groupName + "}",
495                                    e);
496                    }
497    
498                    _readLock.lock();
499    
500                    try {
501                            _schedulerEngine.suppressError(jobName, groupName);
502                    }
503                    finally {
504                            _readLock.unlock();
505                    }
506    
507                    skipClusterInvoking(groupName);
508            }
509    
510            @Clusterable
511            public void unschedule(String groupName) throws SchedulerException {
512                    if (!PropsValues.SCHEDULER_ENABLED) {
513                            return;
514                    }
515    
516                    try {
517                            if (isMemorySchedulerSlave(groupName)) {
518                                    removeMemoryClusteredJobs(groupName);
519    
520                                    return;
521                            }
522                    }
523                    catch (Exception e) {
524                            throw new SchedulerException(
525                                    "Unable to unschedule jobs in group " + groupName, e);
526                    }
527    
528                    _readLock.lock();
529    
530                    try {
531                            _schedulerEngine.unschedule(groupName);
532                    }
533                    finally {
534                            _readLock.unlock();
535                    }
536    
537                    skipClusterInvoking(groupName);
538            }
539    
540            @Clusterable
541            public void unschedule(String jobName, String groupName)
542                    throws SchedulerException {
543    
544                    if (!PropsValues.SCHEDULER_ENABLED) {
545                            return;
546                    }
547    
548                    try {
549                            if (isMemorySchedulerSlave(groupName)) {
550                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
551    
552                                    return;
553                            }
554                    }
555                    catch (Exception e) {
556                            throw new SchedulerException(
557                                    "Unable to unschedule job {jobName=" + jobName +
558                                            ", groupName=" + groupName + "}",
559                                    e);
560                    }
561    
562                    _readLock.lock();
563    
564                    try {
565                            _schedulerEngine.unschedule(jobName, groupName);
566                    }
567                    finally {
568                            _readLock.unlock();
569                    }
570    
571                    skipClusterInvoking(groupName);
572            }
573    
574            @Clusterable
575            public void update(Trigger trigger) throws SchedulerException {
576                    if (!PropsValues.SCHEDULER_ENABLED) {
577                            return;
578                    }
579    
580                    String jobName = trigger.getJobName();
581                    String groupName = trigger.getGroupName();
582    
583                    try {
584                            if (isMemorySchedulerSlave(groupName)) {
585                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
586                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
587    
588                                            SchedulerResponse schedulerResponse =
589                                                    memoryClusteredJob.getKey();
590    
591                                            if (jobName.equals(schedulerResponse.getJobName()) &&
592                                                    groupName.equals(schedulerResponse.getGroupName())) {
593    
594                                                    schedulerResponse.setTrigger(trigger);
595    
596                                                    return;
597                                            }
598                                    }
599    
600                                    throw new Exception(
601                                            "Unable to update trigger for memory clustered job");
602                            }
603                    }
604                    catch (Exception e) {
605                            throw new SchedulerException(
606                                    "Unable to update job {jobName=" + jobName + ", groupName=" +
607                                            groupName + "}",
608                                    e);
609                    }
610    
611                    _readLock.lock();
612    
613                    try {
614                            _schedulerEngine.update(trigger);
615                    }
616                    finally {
617                            _readLock.unlock();
618                    }
619    
620                    skipClusterInvoking(groupName);
621            }
622    
623            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
624                    try {
625                            Lock lock = lockMemorySchedulerCluster(null);
626    
627                            Address address = (Address)getDeserializedObject(lock.getOwner());
628    
629                            if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
630                                    return lock;
631                            }
632    
633                            return lockMemorySchedulerCluster(lock.getOwner());
634                    }
635                    catch (Exception e) {
636                            throw new SchedulerException(
637                                    "Unable to update memory scheduler cluster master", e);
638                    }
639            }
640    
641            protected Object callMaster(MethodKey methodKey, Object... arguments)
642                    throws Exception {
643    
644                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
645    
646                    Lock lock = updateMemorySchedulerClusterMaster();
647    
648                    Address address = (Address)getDeserializedObject(lock.getOwner());
649    
650                    if (address.equals(ClusterExecutorUtil.getLocalClusterNodeAddress())) {
651                            if (methodKey == _getScheduledJobsMethodKey3) {
652                                    return methodHandler.invoke(false);
653                            }
654                            else {
655                                    return methodHandler.invoke(schedulerEngine);
656                            }
657                    }
658    
659                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
660                            methodHandler, address);
661    
662                    clusterRequest.setBeanIdentifier(_beanIdentifier);
663    
664                    FutureClusterResponses futureClusterResponses =
665                            ClusterExecutorUtil.execute(clusterRequest);
666    
667                    try {
668                            ClusterNodeResponses clusterNodeResponses =
669                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
670    
671                            ClusterNodeResponse clusterNodeResponse =
672                                    clusterNodeResponses.getClusterResponse(address);
673    
674                            return clusterNodeResponse.getResult();
675                    }
676                    catch (Exception e) {
677                            throw new SchedulerException(
678                                    "Unable to load scheduled jobs from cluster node " +
679                                            address.getDescription(),
680                                    e);
681                    }
682            }
683    
684            protected Object getDeserializedObject(String string) throws Exception {
685                    byte[] bytes = Base64.decode(string);
686    
687                    UnsyncByteArrayInputStream byteArrayInputStream =
688                            new UnsyncByteArrayInputStream(bytes);
689    
690                    ObjectInputStream objectInputStream = new ObjectInputStream(
691                            byteArrayInputStream);
692    
693                    Object object = objectInputStream.readObject();
694    
695                    objectInputStream.close();
696    
697                    return object;
698            }
699    
700            protected String getFullName(String jobName, String groupName) {
701                    return groupName.concat(StringPool.PERIOD).concat(jobName);
702            }
703    
704            protected String getSerializedString(Object object) throws Exception {
705                    UnsyncByteArrayOutputStream byteArrayOutputStream =
706                            new UnsyncByteArrayOutputStream();
707    
708                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
709                            byteArrayOutputStream);
710    
711                    objectOutputStream.writeObject(object);
712                    objectOutputStream.close();
713    
714                    byte[] bytes = byteArrayOutputStream.toByteArray();
715    
716                    return Base64.encode(bytes);
717            }
718    
719            protected StorageType getStorageType(String groupName) {
720                    int pos = groupName.indexOf(CharPool.POUND);
721    
722                    String storageTypeString = groupName.substring(0, pos);
723    
724                    return StorageType.valueOf(storageTypeString);
725            }
726    
727            protected void initMemoryClusteredJobs() throws Exception {
728                    List<SchedulerResponse> schedulerResponses =
729                            (List<SchedulerResponse>)callMaster(
730                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
731    
732                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
733                            Trigger oldTrigger = schedulerResponse.getTrigger();
734    
735                            String jobName = schedulerResponse.getJobName();
736                            String groupName = SchedulerEngineUtil.namespaceGroupName(
737                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
738    
739                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
740                                    oldTrigger.getTriggerType(), jobName, groupName,
741                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
742                                    oldTrigger.getTriggerContent());
743    
744                            schedulerResponse.setTrigger(newTrigger);
745    
746                            TriggerState triggerState = SchedulerEngineUtil.getJobState(
747                                    schedulerResponse);
748    
749                            Message message = schedulerResponse.getMessage();
750    
751                            message.remove(JOB_STATE);
752    
753                            _memoryClusteredJobs.put(
754                                    getFullName(jobName, groupName),
755                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
756                                            schedulerResponse, triggerState));
757                    }
758            }
759    
760            protected boolean isMemorySchedulerClusterLockOwner(Lock lock)
761                    throws Exception {
762    
763                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
764    
765                    if (master == _master) {
766                            return master;
767                    }
768    
769                    if (!_master) {
770                            _master = master;
771    
772                            return _master;
773                    }
774    
775                    _localClusterNodeAddress = getSerializedString(
776                            ClusterExecutorUtil.getLocalClusterNodeAddress());
777    
778                    for (ObjectValuePair<SchedulerResponse, TriggerState>
779                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
780    
781                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
782    
783                            _schedulerEngine.delete(
784                                    schedulerResponse.getJobName(),
785                                    schedulerResponse.getGroupName());
786                    }
787    
788                    initMemoryClusteredJobs();
789    
790                    if (_log.isInfoEnabled()) {
791                            _log.info("Another node is now the memory scheduler master");
792                    }
793    
794                    _master = master;
795    
796                    return master;
797            }
798    
799            protected boolean isMemorySchedulerSlave() throws Exception {
800                    return isMemorySchedulerSlave(null);
801            }
802    
803            protected boolean isMemorySchedulerSlave(String groupName)
804                    throws Exception {
805    
806                    if (groupName != null) {
807                            StorageType storageType = getStorageType(groupName);
808    
809                            if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
810                                    return false;
811                            }
812                    }
813    
814                    Lock lock = lockMemorySchedulerCluster(null);
815    
816                    if (isMemorySchedulerClusterLockOwner(lock)) {
817                            return false;
818                    }
819    
820                    return true;
821            }
822    
823            protected Lock lockMemorySchedulerCluster(String owner) throws Exception {
824                    Lock lock = null;
825    
826                    while (true) {
827                            try {
828                                    if (owner == null) {
829                                            lock = LockLocalServiceUtil.lock(
830                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
831                                                    _localClusterNodeAddress,
832                                                    PropsValues.
833                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
834                                    }
835                                    else {
836                                            lock = LockLocalServiceUtil.lock(
837                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
838                                                    _localClusterNodeAddress,
839                                                    PropsValues.
840                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
841                                    }
842    
843                                    break;
844                            }
845                            catch (Exception e) {
846                                    if (_log.isWarnEnabled()) {
847                                            _log.warn(
848                                                    "Unable to obtain memory scheduler cluster lock. " +
849                                                            "Trying again.");
850                                    }
851                            }
852                    }
853    
854                    if (!lock.isNew()) {
855                            return lock;
856                    }
857    
858                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
859    
860                    ProxyModeThreadLocal.setForceSync(true);
861    
862                    _writeLock.lock();
863    
864                    try {
865                            for (ObjectValuePair<SchedulerResponse, TriggerState>
866                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
867    
868                                    SchedulerResponse schedulerResponse =
869                                            memoryClusteredJob.getKey();
870    
871                                    _schedulerEngine.schedule(
872                                            schedulerResponse.getTrigger(),
873                                            schedulerResponse.getDescription(),
874                                            schedulerResponse.getDestinationName(),
875                                            schedulerResponse.getMessage());
876    
877                                    TriggerState triggerState = memoryClusteredJob.getValue();
878    
879                                    if (triggerState.equals(TriggerState.PAUSED)) {
880                                            _schedulerEngine.pause(
881                                                    schedulerResponse.getJobName(),
882                                                    schedulerResponse.getGroupName());
883                                    }
884                            }
885                    }
886                    finally {
887                            ProxyModeThreadLocal.setForceSync(forceSync);
888    
889                            _writeLock.unlock();
890                    }
891    
892                    return lock;
893            }
894    
895            protected void removeMemoryClusteredJobs(String groupName) {
896                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
897                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
898    
899                    Iterator
900                            <Map.Entry<String,
901                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
902                                            memoryClusteredJobs.iterator();
903    
904                    while (itr.hasNext()) {
905                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
906                                    entry = itr.next();
907    
908                            ObjectValuePair<SchedulerResponse, TriggerState>
909                                    memoryClusteredJob = entry.getValue();
910    
911                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
912    
913                            if (groupName.equals(schedulerResponse.getGroupName())) {
914                                    itr.remove();
915                            }
916                    }
917            }
918    
919            protected void skipClusterInvoking(String groupName)
920                    throws SchedulerException {
921    
922                    StorageType storageType = getStorageType(groupName);
923    
924                    if (storageType.equals(StorageType.PERSISTED)) {
925                            SchedulerException schedulerException = new SchedulerException();
926    
927                            schedulerException.setSwallowable(true);
928    
929                            throw schedulerException;
930                    }
931            }
932    
933            protected void updateMemoryClusteredJob(
934                    String jobName, String groupName, TriggerState triggerState) {
935    
936                    ObjectValuePair<SchedulerResponse, TriggerState>
937                            memoryClusteredJob = _memoryClusteredJobs.get(
938                                    getFullName(jobName, groupName));
939    
940                    if (memoryClusteredJob != null) {
941                            memoryClusteredJob.setValue(triggerState);
942                    }
943            }
944    
945            protected void updateMemoryClusteredJobs(
946                    String groupName, TriggerState triggerState) {
947    
948                    for (ObjectValuePair<SchedulerResponse, TriggerState>
949                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
950    
951                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
952    
953                            if (groupName.equals(schedulerResponse.getGroupName())) {
954                                    memoryClusteredJob.setValue(triggerState);
955                            }
956                    }
957            }
958    
959            @BeanReference(
960                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
961            protected SchedulerEngine schedulerEngine;
962    
963            private static final String _LOCK_CLASS_NAME =
964                    SchedulerEngine.class.getName();
965    
966            private static Log _log = LogFactoryUtil.getLog(
967                    ClusterSchedulerEngine.class);
968    
969            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
970                    SchedulerEngine.class.getName(), "getScheduledJob", String.class,
971                    String.class);
972            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
973                    SchedulerEngine.class.getName(), "getScheduledJobs");
974            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
975                    SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
976            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
977                    SchedulerEngineUtil.class.getName(), "getScheduledJobs",
978                    StorageType.class);
979    
980            private String _beanIdentifier;
981            private ClusterEventListener _clusterEventListener;
982            private volatile String _localClusterNodeAddress;
983            private volatile boolean _master;
984            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
985                    _memoryClusteredJobs = new ConcurrentHashMap
986                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
987            private java.util.concurrent.locks.Lock _readLock;
988            private SchedulerEngine _schedulerEngine;
989            private java.util.concurrent.locks.Lock _writeLock;
990    
991            private class MemorySchedulerClusterEventListener
992                    implements ClusterEventListener {
993    
994                    public void processClusterEvent(ClusterEvent clusterEvent) {
995                            ClusterEventType clusterEventType =
996                                    clusterEvent.getClusterEventType();
997    
998                            if (!clusterEventType.equals(ClusterEventType.DEPART)) {
999                                    return;
1000                            }
1001    
1002                            try {
1003                                    updateMemorySchedulerClusterMaster();
1004                            }
1005                            catch (Exception e) {
1006                                    _log.error("Unable to update memory scheduler cluster lock", e);
1007                            }
1008                    }
1009    
1010            }
1011    
1012    }