001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.kernel.bean.BeanReference;
018    import com.liferay.portal.kernel.bean.IdentifiableBean;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
021    import com.liferay.portal.kernel.cluster.ClusterEvent;
022    import com.liferay.portal.kernel.cluster.ClusterEventListener;
023    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.Clusterable;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.messaging.Message;
034    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037    import com.liferay.portal.kernel.scheduler.SchedulerException;
038    import com.liferay.portal.kernel.scheduler.StorageType;
039    import com.liferay.portal.kernel.scheduler.Trigger;
040    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041    import com.liferay.portal.kernel.scheduler.TriggerState;
042    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
044    import com.liferay.portal.kernel.util.Base64;
045    import com.liferay.portal.kernel.util.CharPool;
046    import com.liferay.portal.kernel.util.MethodHandler;
047    import com.liferay.portal.kernel.util.MethodKey;
048    import com.liferay.portal.kernel.util.ObjectValuePair;
049    import com.liferay.portal.kernel.util.StringPool;
050    import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
051    import com.liferay.portal.model.Lock;
052    import com.liferay.portal.service.LockLocalServiceUtil;
053    import com.liferay.portal.util.PropsValues;
054    
055    import java.io.ObjectInputStream;
056    import java.io.ObjectOutputStream;
057    
058    import java.util.Iterator;
059    import java.util.List;
060    import java.util.Map;
061    import java.util.Set;
062    import java.util.concurrent.ConcurrentHashMap;
063    import java.util.concurrent.TimeUnit;
064    import java.util.concurrent.TimeoutException;
065    import java.util.concurrent.locks.ReadWriteLock;
066    import java.util.concurrent.locks.ReentrantReadWriteLock;
067    
068    /**
069     * @author Tina Tian
070     */
071    public class ClusterSchedulerEngine
072            implements IdentifiableBean, SchedulerEngine,
073                               SchedulerEngineClusterManager {
074    
075            public static SchedulerEngine createClusterSchedulerEngine(
076                    SchedulerEngine schedulerEngine) {
077    
078                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
079                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
080                    }
081    
082                    return schedulerEngine;
083            }
084    
085            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
086                    _schedulerEngine = schedulerEngine;
087            }
088    
089            @Clusterable
090            public void delete(String groupName) throws SchedulerException {
091                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
092    
093                    _readLock.lock();
094    
095                    try {
096                            if (memoryClusteredSlaveJob) {
097                                    removeMemoryClusteredJobs(groupName);
098                            }
099                            else {
100                                    _schedulerEngine.delete(groupName);
101                            }
102                    }
103                    finally {
104                            _readLock.unlock();
105                    }
106    
107                    skipClusterInvoking(groupName);
108            }
109    
110            @Clusterable
111            public void delete(String jobName, String groupName)
112                    throws SchedulerException {
113    
114                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
115    
116                    _readLock.lock();
117    
118                    try {
119                            if (memoryClusteredSlaveJob) {
120                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
121                            }
122                            else {
123                                    _schedulerEngine.delete(jobName, groupName);
124                            }
125                    }
126                    finally {
127                            _readLock.unlock();
128                    }
129    
130                    skipClusterInvoking(groupName);
131            }
132    
133            public String getBeanIdentifier() {
134                    return _beanIdentifier;
135            }
136    
137            public SchedulerResponse getScheduledJob(String jobName, String groupName)
138                    throws SchedulerException {
139    
140                    StorageType storageType = getStorageType(groupName);
141    
142                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
143                            String masterAddressString = getMasterAddressString(false);
144    
145                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
146                                    return (SchedulerResponse)callMaster(
147                                            masterAddressString, _getScheduledJobMethodKey, jobName,
148                                            groupName);
149                            }
150                    }
151    
152                    _readLock.lock();
153    
154                    try {
155                            return _schedulerEngine.getScheduledJob(jobName, groupName);
156                    }
157                    finally {
158                            _readLock.unlock();
159                    }
160            }
161    
162            public List<SchedulerResponse> getScheduledJobs()
163                    throws SchedulerException {
164    
165                    String masterAddressString = getMasterAddressString(false);
166    
167                    if (!_localClusterNodeAddress.equals(masterAddressString)) {
168                            return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
169                    }
170    
171                    _readLock.lock();
172    
173                    try {
174                            return _schedulerEngine.getScheduledJobs();
175                    }
176                    finally {
177                            _readLock.unlock();
178                    }
179            }
180    
181            public List<SchedulerResponse> getScheduledJobs(String groupName)
182                    throws SchedulerException {
183    
184                    StorageType storageType = getStorageType(groupName);
185    
186                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
187                            String masterAddressString = getMasterAddressString(false);
188    
189                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
190                                    return callMaster(
191                                            masterAddressString, _getScheduledJobsMethodKey2,
192                                            groupName);
193                            }
194                    }
195    
196                    _readLock.lock();
197    
198                    try {
199                            return _schedulerEngine.getScheduledJobs(groupName);
200                    }
201                    finally {
202                            _readLock.unlock();
203                    }
204            }
205    
206            public void initialize() throws SchedulerException {
207                    try {
208                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
209    
210                            _readLock = readWriteLock.readLock();
211                            _writeLock = readWriteLock.writeLock();
212    
213                            _localClusterNodeAddress = getSerializedString(
214                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
215    
216                            _clusterEventListener = new MemorySchedulerClusterEventListener();
217    
218                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
219    
220                            String masterAddressString = getMasterAddressString(false);
221    
222                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
223                                    List<SchedulerResponse> schedulerResponses = callMaster(
224                                            masterAddressString, _getScheduledJobsMethodKey3,
225                                            StorageType.MEMORY_CLUSTERED);
226    
227                                    initMemoryClusteredJobs(schedulerResponses);
228                            }
229                    }
230                    catch (Exception e) {
231                            throw new SchedulerException("Unable to initialize scheduler", e);
232                    }
233            }
234    
235            @Clusterable
236            public void pause(String groupName) throws SchedulerException {
237                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
238    
239                    _readLock.lock();
240    
241                    try {
242                            if (memoryClusteredSlaveJob) {
243                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
244    
245                                    return;
246                            }
247    
248                            _schedulerEngine.pause(groupName);
249                    }
250                    finally {
251                            _readLock.unlock();
252                    }
253    
254                    skipClusterInvoking(groupName);
255            }
256    
257            @Clusterable
258            public void pause(String jobName, String groupName)
259                    throws SchedulerException {
260    
261                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
262    
263                    _readLock.lock();
264    
265                    try {
266                            if (memoryClusteredSlaveJob) {
267                                    updateMemoryClusteredJob(
268                                            jobName, groupName, TriggerState.PAUSED);
269    
270                                    return;
271                            }
272    
273                            _schedulerEngine.pause(jobName, groupName);
274                    }
275                    finally {
276                            _readLock.unlock();
277                    }
278    
279                    skipClusterInvoking(groupName);
280            }
281    
282            @Clusterable
283            public void resume(String groupName) throws SchedulerException {
284                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
285    
286                    _readLock.lock();
287    
288                    try {
289                            if (memoryClusteredSlaveJob) {
290                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
291    
292                                    return;
293                            }
294    
295                            _schedulerEngine.resume(groupName);
296                    }
297                    finally {
298                            _readLock.unlock();
299                    }
300    
301                    skipClusterInvoking(groupName);
302            }
303    
304            @Clusterable
305            public void resume(String jobName, String groupName)
306                    throws SchedulerException {
307    
308                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
309    
310                    _readLock.lock();
311    
312                    try {
313                            if (memoryClusteredSlaveJob) {
314                                    updateMemoryClusteredJob(
315                                            jobName, groupName, TriggerState.NORMAL);
316    
317                                    return;
318                            }
319    
320                            _schedulerEngine.resume(jobName, groupName);
321                    }
322                    finally {
323                            _readLock.unlock();
324                    }
325    
326                    skipClusterInvoking(groupName);
327            }
328    
329            @Clusterable
330            public void schedule(
331                            Trigger trigger, String description, String destinationName,
332                            Message message)
333                    throws SchedulerException {
334    
335                    String groupName = trigger.getGroupName();
336                    String jobName = trigger.getJobName();
337    
338                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
339    
340                    _readLock.lock();
341    
342                    try {
343                            if (memoryClusteredSlaveJob) {
344                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
345    
346                                    schedulerResponse.setDescription(description);
347                                    schedulerResponse.setDestinationName(destinationName);
348                                    schedulerResponse.setGroupName(groupName);
349                                    schedulerResponse.setJobName(jobName);
350                                    schedulerResponse.setMessage(message);
351                                    schedulerResponse.setTrigger(trigger);
352    
353                                    _memoryClusteredJobs.put(
354                                            getFullName(jobName, groupName),
355                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
356                                                    schedulerResponse, TriggerState.NORMAL));
357    
358                                    return;
359                            }
360    
361                            _schedulerEngine.schedule(
362                                    trigger, description, destinationName, message);
363                    }
364                    finally {
365                            _readLock.unlock();
366                    }
367    
368                    skipClusterInvoking(groupName);
369            }
370    
371            public void setBeanIdentifier(String beanIdentifier) {
372                    _beanIdentifier = beanIdentifier;
373            }
374    
375            public void shutdown() throws SchedulerException {
376                    try {
377                            ClusterExecutorUtil.removeClusterEventListener(
378                                    _clusterEventListener);
379    
380                            LockLocalServiceUtil.unlock(
381                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
382                                    PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
383                    }
384                    catch (Exception e) {
385                            throw new SchedulerException("Unable to shutdown scheduler", e);
386                    }
387    
388                    _schedulerEngine.shutdown();
389            }
390    
391            public void start() throws SchedulerException {
392                    _schedulerEngine.start();
393            }
394    
395            @Clusterable
396            public void suppressError(String jobName, String groupName)
397                    throws SchedulerException {
398    
399                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
400    
401                    if (memoryClusteredSlaveJob) {
402                            return;
403                    }
404    
405                    _readLock.lock();
406    
407                    try {
408                            _schedulerEngine.suppressError(jobName, groupName);
409                    }
410                    finally {
411                            _readLock.unlock();
412                    }
413    
414                    skipClusterInvoking(groupName);
415            }
416    
417            @Clusterable
418            public void unschedule(String groupName) throws SchedulerException {
419                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
420    
421                    _readLock.lock();
422    
423                    try {
424                            if (memoryClusteredSlaveJob) {
425                                    removeMemoryClusteredJobs(groupName);
426                            }
427                            else {
428                                    _schedulerEngine.unschedule(groupName);
429                            }
430                    }
431                    finally {
432                            _readLock.unlock();
433                    }
434    
435                    skipClusterInvoking(groupName);
436            }
437    
438            @Clusterable
439            public void unschedule(String jobName, String groupName)
440                    throws SchedulerException {
441    
442                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
443    
444                    _readLock.lock();
445    
446                    try {
447                            if (memoryClusteredSlaveJob) {
448                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
449                            }
450                            else {
451                                    _schedulerEngine.unschedule(jobName, groupName);
452                            }
453                    }
454                    finally {
455                            _readLock.unlock();
456                    }
457    
458                    skipClusterInvoking(groupName);
459            }
460    
461            @Clusterable
462            public void update(Trigger trigger) throws SchedulerException {
463                    String jobName = trigger.getJobName();
464                    String groupName = trigger.getGroupName();
465    
466                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
467    
468                    _readLock.lock();
469    
470                    try {
471                            if (memoryClusteredSlaveJob) {
472                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
473                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
474    
475                                            SchedulerResponse schedulerResponse =
476                                                    memoryClusteredJob.getKey();
477    
478                                            if (jobName.equals(schedulerResponse.getJobName()) &&
479                                                    groupName.equals(schedulerResponse.getGroupName())) {
480    
481                                                    schedulerResponse.setTrigger(trigger);
482    
483                                                    return;
484                                            }
485                                    }
486    
487                                    throw new SchedulerException(
488                                            "Unable to update trigger for memory clustered job");
489                            }
490    
491                            _schedulerEngine.update(trigger);
492                    }
493                    finally {
494                            _readLock.unlock();
495                    }
496    
497                    skipClusterInvoking(groupName);
498            }
499    
500            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
501                    getMasterAddressString(false);
502    
503                    return null;
504            }
505    
506            protected <T> T callMaster(
507                            String masterAddressString, MethodKey methodKey,
508                            Object... arguments)
509                    throws SchedulerException {
510    
511                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
512    
513                    Address address = (Address)getDeserializedObject(masterAddressString);
514    
515                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
516                            methodHandler, address);
517    
518                    clusterRequest.setBeanIdentifier(_beanIdentifier);
519    
520                    try {
521                            FutureClusterResponses futureClusterResponses =
522                                    ClusterExecutorUtil.execute(clusterRequest);
523    
524                            ClusterNodeResponses clusterNodeResponses =
525                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
526    
527                            ClusterNodeResponse clusterNodeResponse =
528                                    clusterNodeResponses.getClusterResponse(address);
529    
530                            return (T)clusterNodeResponse.getResult();
531                    }
532                    catch (Exception e) {
533                            throw new SchedulerException(
534                                    "Unable to load scheduled jobs from cluster node " +
535                                            address.getDescription(),
536                                    e);
537                    }
538            }
539    
540            protected Object getDeserializedObject(String string)
541                    throws SchedulerException {
542    
543                    byte[] bytes = Base64.decode(string);
544    
545                    UnsyncByteArrayInputStream byteArrayInputStream =
546                            new UnsyncByteArrayInputStream(bytes);
547    
548                    ObjectInputStream objectInputStream = null;
549    
550                    try {
551                            objectInputStream = new ObjectInputStream(byteArrayInputStream);
552    
553                            Object object = objectInputStream.readObject();
554    
555                            return object;
556                    }
557    
558                    catch(Exception e) {
559                            throw new SchedulerException(
560                                    "Unable to deserialize object from " + string, e);
561                    }
562                    finally {
563                            try {
564                                    objectInputStream.close();
565                            }
566    
567                            catch(Exception e) {
568                            }
569                    }
570            }
571    
572            protected String getFullName(String jobName, String groupName) {
573                    return groupName.concat(StringPool.PERIOD).concat(jobName);
574            }
575    
576            protected String getMasterAddressString(boolean asynchronous)
577                    throws SchedulerException {
578    
579                    String owner = null;
580    
581                    Lock lock = null;
582    
583                    while (true) {
584                            try {
585                                    if (owner == null) {
586                                            lock = LockLocalServiceUtil.lock(
587                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
588                                                    _localClusterNodeAddress,
589                                                    PropsValues.
590                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
591                                    }
592                                    else {
593                                            lock = LockLocalServiceUtil.lock(
594                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
595                                                    _localClusterNodeAddress,
596                                                    PropsValues.
597                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
598                                    }
599    
600                                    Address address = (Address)getDeserializedObject(
601                                            lock.getOwner());
602    
603                                    if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
604                                            break;
605                                    }
606                                    else {
607                                            owner = lock.getOwner();
608                                    }
609                            }
610                            catch (Exception e) {
611                                    if (_log.isWarnEnabled()) {
612                                            _log.warn(
613                                                    "Unable to obtain memory scheduler cluster lock. " +
614                                                            "Trying again.");
615                                    }
616                            }
617                    }
618    
619                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
620    
621                    if (master == _master) {
622                            return lock.getOwner();
623                    }
624    
625                    if (master) {
626                            slaveToMaster();
627                    }
628                    else {
629                            masterToSlave(lock.getOwner(), asynchronous);
630                    }
631    
632                    return lock.getOwner();
633            }
634    
635            protected String getSerializedString(Object object) throws Exception {
636                    UnsyncByteArrayOutputStream byteArrayOutputStream =
637                            new UnsyncByteArrayOutputStream();
638    
639                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
640                            byteArrayOutputStream);
641    
642                    objectOutputStream.writeObject(object);
643                    objectOutputStream.close();
644    
645                    byte[] bytes = byteArrayOutputStream.toByteArray();
646    
647                    return Base64.encode(bytes);
648            }
649    
650            protected StorageType getStorageType(String groupName) {
651                    int pos = groupName.indexOf(CharPool.POUND);
652    
653                    String storageTypeString = groupName.substring(0, pos);
654    
655                    return StorageType.valueOf(storageTypeString);
656            }
657    
658            protected void initMemoryClusteredJobs(
659                            List<SchedulerResponse> schedulerResponses)
660                    throws Exception {
661    
662                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
663                            Trigger oldTrigger = schedulerResponse.getTrigger();
664    
665                            String jobName = schedulerResponse.getJobName();
666                            String groupName = SchedulerEngineUtil.namespaceGroupName(
667                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
668    
669                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
670                                    oldTrigger.getTriggerType(), jobName, groupName,
671                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
672                                    oldTrigger.getTriggerContent());
673    
674                            schedulerResponse.setTrigger(newTrigger);
675    
676                            TriggerState triggerState = SchedulerEngineUtil.getJobState(
677                                    schedulerResponse);
678    
679                            Message message = schedulerResponse.getMessage();
680    
681                            message.remove(JOB_STATE);
682    
683                            _memoryClusteredJobs.put(
684                                    getFullName(jobName, groupName),
685                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
686                                            schedulerResponse, triggerState));
687                    }
688            }
689    
690            protected boolean isMemoryClusteredSlaveJob(String groupName)
691                    throws SchedulerException {
692    
693                    StorageType storageType = getStorageType(groupName);
694    
695                    if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
696                            return false;
697                    }
698    
699                    String masterAddressString = getMasterAddressString(false);
700    
701                    if (_localClusterNodeAddress.equals(masterAddressString)) {
702                            return false;
703                    }
704    
705                    return true;
706            }
707    
708            protected void masterToSlave(
709                            String masterAddressString, boolean asynchronous)
710                    throws SchedulerException {
711    
712                    if (asynchronous) {
713                            MethodHandler methodHandler = new MethodHandler(
714                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
715    
716                            Address address = (Address)getDeserializedObject(
717                                    masterAddressString);
718    
719                            ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
720                                    methodHandler, address);
721    
722                            clusterRequest.setBeanIdentifier(_beanIdentifier);
723    
724                            try {
725                                    ClusterExecutorUtil.execute(
726                                            clusterRequest,
727                                            new MemorySchedulerClusterResponseCallback(address), 20,
728                                            TimeUnit.SECONDS);
729    
730                                    return;
731                            }
732                            catch (Exception e) {
733                                    throw new SchedulerException(
734                                            "Unable to load scheduled jobs from cluster node " +
735                                                    address.getDescription(),
736                                            e);
737                            }
738                    }
739    
740                    List<SchedulerResponse> schedulerResponses = callMaster(
741                            masterAddressString, _getScheduledJobsMethodKey3,
742                            StorageType.MEMORY_CLUSTERED);
743    
744                    _doMasterToSlave(schedulerResponses);
745            }
746    
747            protected void removeMemoryClusteredJobs(String groupName) {
748                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
749                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
750    
751                    Iterator
752                            <Map.Entry<String,
753                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
754                                            memoryClusteredJobs.iterator();
755    
756                    while (itr.hasNext()) {
757                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
758                                    entry = itr.next();
759    
760                            ObjectValuePair<SchedulerResponse, TriggerState>
761                                    memoryClusteredJob = entry.getValue();
762    
763                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
764    
765                            if (groupName.equals(schedulerResponse.getGroupName())) {
766                                    itr.remove();
767                            }
768                    }
769            }
770    
771            protected void skipClusterInvoking(String groupName)
772                    throws SchedulerException {
773    
774                    StorageType storageType = getStorageType(groupName);
775    
776                    if (storageType.equals(StorageType.PERSISTED) ||
777                            PluginContextLifecycleThreadLocal.isDestroying()) {
778    
779                            SchedulerException schedulerException = new SchedulerException();
780    
781                            schedulerException.setSwallowable(true);
782    
783                            throw schedulerException;
784                    }
785            }
786    
787            protected void slaveToMaster() throws SchedulerException {
788                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
789    
790                    ProxyModeThreadLocal.setForceSync(true);
791    
792                    _writeLock.lock();
793    
794                    try {
795                            for (ObjectValuePair<SchedulerResponse, TriggerState>
796                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
797    
798                                    SchedulerResponse schedulerResponse =
799                                            memoryClusteredJob.getKey();
800    
801                                    _schedulerEngine.schedule(
802                                            schedulerResponse.getTrigger(),
803                                            schedulerResponse.getDescription(),
804                                            schedulerResponse.getDestinationName(),
805                                            schedulerResponse.getMessage());
806    
807                                    TriggerState triggerState = memoryClusteredJob.getValue();
808    
809                                    if (triggerState.equals(TriggerState.PAUSED)) {
810                                            _schedulerEngine.pause(
811                                                    schedulerResponse.getJobName(),
812                                                    schedulerResponse.getGroupName());
813                                    }
814                            }
815    
816                            _memoryClusteredJobs.clear();
817                    }
818                    finally {
819                            ProxyModeThreadLocal.setForceSync(forceSync);
820    
821                            _master = true;
822    
823                            _writeLock.unlock();
824                    }
825            }
826    
827            protected void updateMemoryClusteredJob(
828                    String jobName, String groupName, TriggerState triggerState) {
829    
830                    ObjectValuePair<SchedulerResponse, TriggerState>
831                            memoryClusteredJob = _memoryClusteredJobs.get(
832                                    getFullName(jobName, groupName));
833    
834                    if (memoryClusteredJob != null) {
835                            memoryClusteredJob.setValue(triggerState);
836                    }
837            }
838    
839            protected void updateMemoryClusteredJobs(
840                    String groupName, TriggerState triggerState) {
841    
842                    for (ObjectValuePair<SchedulerResponse, TriggerState>
843                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
844    
845                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
846    
847                            if (groupName.equals(schedulerResponse.getGroupName())) {
848                                    memoryClusteredJob.setValue(triggerState);
849                            }
850                    }
851            }
852    
853            @BeanReference(
854                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
855            protected SchedulerEngine schedulerEngine;
856    
857            private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
858                    throws SchedulerException {
859    
860                    _writeLock.lock();
861    
862                    try {
863                            for (SchedulerResponse schedulerResponse :
864                                            _schedulerEngine.getScheduledJobs()) {
865    
866                                    if (StorageType.MEMORY_CLUSTERED ==
867                                                    schedulerResponse.getStorageType()) {
868    
869                                            _schedulerEngine.delete(
870                                                    schedulerResponse.getJobName(),
871                                                    schedulerResponse.getGroupName());
872                                    }
873                            }
874    
875                            initMemoryClusteredJobs(schedulerResponses);
876    
877                            if (_log.isInfoEnabled()) {
878                                    _log.info("Switched current node from master to slave");
879                            }
880                    }
881                    catch (Exception e) {
882                            throw new SchedulerException(e);
883                    }
884                    finally {
885                            _master = false;
886    
887                            _writeLock.unlock();
888                    }
889            }
890    
891            private static final String _LOCK_CLASS_NAME =
892                    SchedulerEngine.class.getName();
893    
894            private static Log _log = LogFactoryUtil.getLog(
895                    ClusterSchedulerEngine.class);
896    
897            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
898                    SchedulerEngine.class.getName(), "getScheduledJob", String.class,
899                    String.class);
900            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
901                    SchedulerEngine.class.getName(), "getScheduledJobs");
902            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
903                    SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
904            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
905                    SchedulerEngineUtil.class.getName(), "getScheduledJobs",
906                    StorageType.class);
907    
908            private String _beanIdentifier;
909            private ClusterEventListener _clusterEventListener;
910            private volatile String _localClusterNodeAddress;
911            private volatile boolean _master;
912            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
913                    _memoryClusteredJobs = new ConcurrentHashMap
914                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
915            private java.util.concurrent.locks.Lock _readLock;
916            private SchedulerEngine _schedulerEngine;
917            private java.util.concurrent.locks.Lock _writeLock;
918    
919            private class MemorySchedulerClusterEventListener
920                    implements ClusterEventListener {
921    
922                    public void processClusterEvent(ClusterEvent clusterEvent) {
923                            try {
924                                    getMasterAddressString(true);
925                            }
926                            catch (Exception e) {
927                                    _log.error("Unable to update memory scheduler cluster lock", e);
928                            }
929                    }
930    
931            }
932    
933            private class MemorySchedulerClusterResponseCallback
934                    extends BaseClusterResponseCallback {
935    
936                    public MemorySchedulerClusterResponseCallback(Address address) {
937                            _address = address;
938                    }
939    
940                    @Override
941                    public void callback(ClusterNodeResponses clusterNodeResponses) {
942                            try {
943                                    ClusterNodeResponse clusterNodeResponse =
944                                            clusterNodeResponses.getClusterResponse(_address);
945    
946                                    List<SchedulerResponse> schedulerResponses =
947                                            (List<SchedulerResponse>)clusterNodeResponse.getResult();
948    
949                                    _doMasterToSlave(schedulerResponses);
950                            }
951                            catch (Exception e) {
952                                    _log.error(
953                                            "Unable to load memory clustered jobs from cluster node " +
954                                                    _address.getDescription(),
955                                            e);
956                            }
957                    }
958    
959                    @Override
960                    public void processTimeoutException(TimeoutException timeoutException) {
961                            _log.error(
962                                    "Unable to load memory clustered jobs from cluster node " +
963                                            _address.getDescription(),
964                                    timeoutException);
965                    }
966    
967                    private Address _address;
968    
969            }
970    
971    }