001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.kernel.bean.BeanReference;
018    import com.liferay.portal.kernel.bean.IdentifiableBean;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
021    import com.liferay.portal.kernel.cluster.ClusterEvent;
022    import com.liferay.portal.kernel.cluster.ClusterEventListener;
023    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.Clusterable;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.messaging.Message;
034    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
035    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
036    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
037    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
038    import com.liferay.portal.kernel.scheduler.SchedulerException;
039    import com.liferay.portal.kernel.scheduler.StorageType;
040    import com.liferay.portal.kernel.scheduler.Trigger;
041    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
042    import com.liferay.portal.kernel.scheduler.TriggerState;
043    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
044    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
045    import com.liferay.portal.kernel.util.Base64;
046    import com.liferay.portal.kernel.util.CharPool;
047    import com.liferay.portal.kernel.util.MethodHandler;
048    import com.liferay.portal.kernel.util.MethodKey;
049    import com.liferay.portal.kernel.util.ObjectValuePair;
050    import com.liferay.portal.kernel.util.StringPool;
051    import com.liferay.portal.model.Lock;
052    import com.liferay.portal.service.LockLocalServiceUtil;
053    import com.liferay.portal.util.PropsValues;
054    
055    import java.io.ObjectInputStream;
056    import java.io.ObjectOutputStream;
057    
058    import java.util.Iterator;
059    import java.util.List;
060    import java.util.Map;
061    import java.util.Set;
062    import java.util.concurrent.ConcurrentHashMap;
063    import java.util.concurrent.TimeUnit;
064    import java.util.concurrent.TimeoutException;
065    import java.util.concurrent.locks.ReadWriteLock;
066    import java.util.concurrent.locks.ReentrantReadWriteLock;
067    
068    /**
069     * @author Tina Tian
070     */
071    public class ClusterSchedulerEngine
072            implements IdentifiableBean, SchedulerEngine,
073                               SchedulerEngineClusterManager {
074    
075            public static SchedulerEngine createClusterSchedulerEngine(
076                    SchedulerEngine schedulerEngine) {
077    
078                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
079                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
080                    }
081    
082                    return schedulerEngine;
083            }
084    
085            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
086                    _schedulerEngine = schedulerEngine;
087            }
088    
089            @Clusterable
090            public void delete(String groupName) throws SchedulerException {
091                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
092    
093                    _readLock.lock();
094    
095                    try {
096                            if (memoryClusteredSlaveJob) {
097                                    removeMemoryClusteredJobs(groupName);
098                            }
099                            else {
100                                    _schedulerEngine.delete(groupName);
101                            }
102                    }
103                    finally {
104                            _readLock.unlock();
105                    }
106    
107                    skipClusterInvoking(groupName);
108            }
109    
110            @Clusterable
111            public void delete(String jobName, String groupName)
112                    throws SchedulerException {
113    
114                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
115    
116                    _readLock.lock();
117    
118                    try {
119                            if (memoryClusteredSlaveJob) {
120                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
121                            }
122                            else {
123                                    _schedulerEngine.delete(jobName, groupName);
124                            }
125                    }
126                    finally {
127                            _readLock.unlock();
128                    }
129    
130                    skipClusterInvoking(groupName);
131            }
132    
133            public String getBeanIdentifier() {
134                    return _beanIdentifier;
135            }
136    
137            public SchedulerResponse getScheduledJob(String jobName, String groupName)
138                    throws SchedulerException {
139    
140                    StorageType storageType = getStorageType(groupName);
141    
142                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
143                            String masterAddressString = getMasterAddressString(false);
144    
145                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
146                                    return (SchedulerResponse)callMaster(
147                                            masterAddressString, _getScheduledJobMethodKey, jobName,
148                                            groupName);
149                            }
150                    }
151    
152                    _readLock.lock();
153    
154                    try {
155                            return _schedulerEngine.getScheduledJob(jobName, groupName);
156                    }
157                    finally {
158                            _readLock.unlock();
159                    }
160            }
161    
162            public List<SchedulerResponse> getScheduledJobs()
163                    throws SchedulerException {
164    
165                    String masterAddressString = getMasterAddressString(false);
166    
167                    if (!_localClusterNodeAddress.equals(masterAddressString)) {
168                            return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
169                    }
170    
171                    _readLock.lock();
172    
173                    try {
174                            return _schedulerEngine.getScheduledJobs();
175                    }
176                    finally {
177                            _readLock.unlock();
178                    }
179            }
180    
181            public List<SchedulerResponse> getScheduledJobs(String groupName)
182                    throws SchedulerException {
183    
184                    StorageType storageType = getStorageType(groupName);
185    
186                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
187                            String masterAddressString = getMasterAddressString(false);
188    
189                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
190                                    return callMaster(
191                                            masterAddressString, _getScheduledJobsMethodKey2,
192                                            groupName);
193                            }
194                    }
195    
196                    _readLock.lock();
197    
198                    try {
199                            return _schedulerEngine.getScheduledJobs(groupName);
200                    }
201                    finally {
202                            _readLock.unlock();
203                    }
204            }
205    
206            public void initialize() throws SchedulerException {
207                    try {
208                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
209    
210                            _readLock = readWriteLock.readLock();
211                            _writeLock = readWriteLock.writeLock();
212    
213                            _localClusterNodeAddress = getSerializedString(
214                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
215    
216                            _clusterEventListener = new MemorySchedulerClusterEventListener();
217    
218                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
219    
220                            String masterAddressString = getMasterAddressString(false);
221    
222                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
223                                    List<SchedulerResponse> schedulerResponses = callMaster(
224                                            masterAddressString, _getScheduledJobsMethodKey3,
225                                            StorageType.MEMORY_CLUSTERED);
226    
227                                    initMemoryClusteredJobs(schedulerResponses);
228                            }
229                    }
230                    catch (Exception e) {
231                            throw new SchedulerException("Unable to initialize scheduler", e);
232                    }
233            }
234    
235            @Clusterable
236            public void pause(String groupName) throws SchedulerException {
237                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
238    
239                    _readLock.lock();
240    
241                    try {
242                            if (memoryClusteredSlaveJob) {
243                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
244    
245                                    return;
246                            }
247    
248                            _schedulerEngine.pause(groupName);
249                    }
250                    finally {
251                            _readLock.unlock();
252                    }
253    
254                    skipClusterInvoking(groupName);
255            }
256    
257            @Clusterable
258            public void pause(String jobName, String groupName)
259                    throws SchedulerException {
260    
261                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
262    
263                    _readLock.lock();
264    
265                    try {
266                            if (memoryClusteredSlaveJob) {
267                                    updateMemoryClusteredJob(
268                                            jobName, groupName, TriggerState.PAUSED);
269    
270                                    return;
271                            }
272    
273                            _schedulerEngine.pause(jobName, groupName);
274                    }
275                    finally {
276                            _readLock.unlock();
277                    }
278    
279                    skipClusterInvoking(groupName);
280            }
281    
282            @Clusterable
283            public void resume(String groupName) throws SchedulerException {
284                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
285    
286                    _readLock.lock();
287    
288                    try {
289                            if (memoryClusteredSlaveJob) {
290                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
291    
292                                    return;
293                            }
294    
295                            _schedulerEngine.resume(groupName);
296                    }
297                    finally {
298                            _readLock.unlock();
299                    }
300    
301                    skipClusterInvoking(groupName);
302            }
303    
304            @Clusterable
305            public void resume(String jobName, String groupName)
306                    throws SchedulerException {
307    
308                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
309    
310                    _readLock.lock();
311    
312                    try {
313                            if (memoryClusteredSlaveJob) {
314                                    updateMemoryClusteredJob(
315                                            jobName, groupName, TriggerState.NORMAL);
316    
317                                    return;
318                            }
319    
320                            _schedulerEngine.resume(jobName, groupName);
321                    }
322                    finally {
323                            _readLock.unlock();
324                    }
325    
326                    skipClusterInvoking(groupName);
327            }
328    
329            @Clusterable
330            public void schedule(
331                            Trigger trigger, String description, String destinationName,
332                            Message message)
333                    throws SchedulerException {
334    
335                    String groupName = trigger.getGroupName();
336                    String jobName = trigger.getJobName();
337    
338                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
339    
340                    _readLock.lock();
341    
342                    try {
343                            if (memoryClusteredSlaveJob) {
344                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
345    
346                                    schedulerResponse.setDescription(description);
347                                    schedulerResponse.setDestinationName(destinationName);
348                                    schedulerResponse.setGroupName(groupName);
349                                    schedulerResponse.setJobName(jobName);
350                                    schedulerResponse.setMessage(message);
351                                    schedulerResponse.setTrigger(trigger);
352    
353                                    _memoryClusteredJobs.put(
354                                            getFullName(jobName, groupName),
355                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
356                                                    schedulerResponse, TriggerState.NORMAL));
357    
358                                    return;
359                            }
360    
361                            _schedulerEngine.schedule(
362                                    trigger, description, destinationName, message);
363                    }
364                    finally {
365                            _readLock.unlock();
366                    }
367    
368                    skipClusterInvoking(groupName);
369            }
370    
371            public void setBeanIdentifier(String beanIdentifier) {
372                    _beanIdentifier = beanIdentifier;
373            }
374    
375            public void shutdown() throws SchedulerException {
376                    try {
377                            ClusterExecutorUtil.removeClusterEventListener(
378                                    _clusterEventListener);
379    
380                            LockLocalServiceUtil.unlock(
381                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
382                                    PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
383                    }
384                    catch (Exception e) {
385                            throw new SchedulerException("Unable to shutdown scheduler", e);
386                    }
387    
388                    _schedulerEngine.shutdown();
389            }
390    
391            public void start() throws SchedulerException {
392                    _schedulerEngine.start();
393            }
394    
395            @Clusterable
396            public void suppressError(String jobName, String groupName)
397                    throws SchedulerException {
398    
399                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
400    
401                    if (memoryClusteredSlaveJob) {
402                            return;
403                    }
404    
405                    _readLock.lock();
406    
407                    try {
408                            _schedulerEngine.suppressError(jobName, groupName);
409                    }
410                    finally {
411                            _readLock.unlock();
412                    }
413    
414                    skipClusterInvoking(groupName);
415            }
416    
417            @Clusterable
418            public void unschedule(String groupName) throws SchedulerException {
419                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
420    
421                    _readLock.lock();
422    
423                    try {
424                            if (memoryClusteredSlaveJob) {
425                                    removeMemoryClusteredJobs(groupName);
426                            }
427                            else {
428                                    _schedulerEngine.unschedule(groupName);
429                            }
430                    }
431                    finally {
432                            _readLock.unlock();
433                    }
434    
435                    skipClusterInvoking(groupName);
436            }
437    
438            @Clusterable
439            public void unschedule(String jobName, String groupName)
440                    throws SchedulerException {
441    
442                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
443    
444                    _readLock.lock();
445    
446                    try {
447                            if (memoryClusteredSlaveJob) {
448                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
449                            }
450                            else {
451                                    _schedulerEngine.unschedule(jobName, groupName);
452                            }
453                    }
454                    finally {
455                            _readLock.unlock();
456                    }
457    
458                    skipClusterInvoking(groupName);
459            }
460    
461            @Clusterable
462            public void update(Trigger trigger) throws SchedulerException {
463                    String jobName = trigger.getJobName();
464                    String groupName = trigger.getGroupName();
465    
466                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
467    
468                    _readLock.lock();
469    
470                    try {
471                            if (memoryClusteredSlaveJob) {
472                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
473                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
474    
475                                            SchedulerResponse schedulerResponse =
476                                                    memoryClusteredJob.getKey();
477    
478                                            if (jobName.equals(schedulerResponse.getJobName()) &&
479                                                    groupName.equals(schedulerResponse.getGroupName())) {
480    
481                                                    schedulerResponse.setTrigger(trigger);
482    
483                                                    return;
484                                            }
485                                    }
486    
487                                    throw new SchedulerException(
488                                            "Unable to update trigger for memory clustered job");
489                            }
490    
491                            _schedulerEngine.update(trigger);
492                    }
493                    finally {
494                            _readLock.unlock();
495                    }
496    
497                    skipClusterInvoking(groupName);
498            }
499    
500            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
501                    getMasterAddressString(false);
502    
503                    return null;
504            }
505    
506            protected <T> T callMaster(
507                            String masterAddressString, MethodKey methodKey,
508                            Object... arguments)
509                    throws SchedulerException {
510    
511                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
512    
513                    Address address = (Address)getDeserializedObject(masterAddressString);
514    
515                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
516                            methodHandler, address);
517    
518                    clusterRequest.setBeanIdentifier(_beanIdentifier);
519    
520                    try {
521                            FutureClusterResponses futureClusterResponses =
522                                    ClusterExecutorUtil.execute(clusterRequest);
523    
524                            ClusterNodeResponses clusterNodeResponses =
525                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
526    
527                            ClusterNodeResponse clusterNodeResponse =
528                                    clusterNodeResponses.getClusterResponse(address);
529    
530                            return (T)clusterNodeResponse.getResult();
531                    }
532                    catch (Exception e) {
533                            throw new SchedulerException(
534                                    "Unable to load scheduled jobs from cluster node " +
535                                            address.getDescription(),
536                                    e);
537                    }
538            }
539    
540            protected Object getDeserializedObject(String string)
541                    throws SchedulerException {
542    
543                    byte[] bytes = Base64.decode(string);
544    
545                    UnsyncByteArrayInputStream byteArrayInputStream =
546                            new UnsyncByteArrayInputStream(bytes);
547    
548                    ObjectInputStream objectInputStream = null;
549    
550                    try {
551                            objectInputStream = new ObjectInputStream(byteArrayInputStream);
552    
553                            Object object = objectInputStream.readObject();
554    
555                            return object;
556                    }
557                    catch (Exception e) {
558                            throw new SchedulerException(
559                                    "Unable to deserialize object from " + string, e);
560                    }
561                    finally {
562                            try {
563                                    objectInputStream.close();
564                            }
565                            catch (Exception e) {
566                            }
567                    }
568            }
569    
570            protected String getFullName(String jobName, String groupName) {
571                    return groupName.concat(StringPool.PERIOD).concat(jobName);
572            }
573    
574            protected String getMasterAddressString(boolean asynchronous)
575                    throws SchedulerException {
576    
577                    String owner = null;
578    
579                    Lock lock = null;
580    
581                    while (true) {
582                            try {
583                                    if (owner == null) {
584                                            lock = LockLocalServiceUtil.lock(
585                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
586                                                    _localClusterNodeAddress,
587                                                    PropsValues.
588                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
589                                    }
590                                    else {
591                                            lock = LockLocalServiceUtil.lock(
592                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
593                                                    _localClusterNodeAddress,
594                                                    PropsValues.
595                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
596                                    }
597    
598                                    Address address = (Address)getDeserializedObject(
599                                            lock.getOwner());
600    
601                                    if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
602                                            break;
603                                    }
604                                    else {
605                                            owner = lock.getOwner();
606                                    }
607                            }
608                            catch (Exception e) {
609                                    if (_log.isWarnEnabled()) {
610                                            _log.warn(
611                                                    "Unable to obtain memory scheduler cluster lock. " +
612                                                            "Trying again.");
613                                    }
614                            }
615                    }
616    
617                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
618    
619                    if (master == _master) {
620                            return lock.getOwner();
621                    }
622    
623                    if (master) {
624                            slaveToMaster();
625                    }
626                    else {
627                            masterToSlave(lock.getOwner(), asynchronous);
628                    }
629    
630                    return lock.getOwner();
631            }
632    
633            protected String getSerializedString(Object object) throws Exception {
634                    UnsyncByteArrayOutputStream byteArrayOutputStream =
635                            new UnsyncByteArrayOutputStream();
636    
637                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
638                            byteArrayOutputStream);
639    
640                    objectOutputStream.writeObject(object);
641                    objectOutputStream.close();
642    
643                    byte[] bytes = byteArrayOutputStream.toByteArray();
644    
645                    return Base64.encode(bytes);
646            }
647    
648            protected StorageType getStorageType(String groupName) {
649                    int pos = groupName.indexOf(CharPool.POUND);
650    
651                    String storageTypeString = groupName.substring(0, pos);
652    
653                    return StorageType.valueOf(storageTypeString);
654            }
655    
656            protected void initMemoryClusteredJobs(
657                            List<SchedulerResponse> schedulerResponses)
658                    throws Exception {
659    
660                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
661                            Trigger oldTrigger = schedulerResponse.getTrigger();
662    
663                            String jobName = schedulerResponse.getJobName();
664                            String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
665                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
666    
667                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
668                                    oldTrigger.getTriggerType(), jobName, groupName,
669                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
670                                    oldTrigger.getTriggerContent());
671    
672                            schedulerResponse.setTrigger(newTrigger);
673    
674                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
675                                    schedulerResponse);
676    
677                            Message message = schedulerResponse.getMessage();
678    
679                            message.remove(JOB_STATE);
680    
681                            _memoryClusteredJobs.put(
682                                    getFullName(jobName, groupName),
683                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
684                                            schedulerResponse, triggerState));
685                    }
686            }
687    
688            protected boolean isMemoryClusteredSlaveJob(String groupName)
689                    throws SchedulerException {
690    
691                    StorageType storageType = getStorageType(groupName);
692    
693                    if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
694                            return false;
695                    }
696    
697                    String masterAddressString = getMasterAddressString(false);
698    
699                    if (_localClusterNodeAddress.equals(masterAddressString)) {
700                            return false;
701                    }
702    
703                    return true;
704            }
705    
706            protected void masterToSlave(
707                            String masterAddressString, boolean asynchronous)
708                    throws SchedulerException {
709    
710                    if (asynchronous) {
711                            MethodHandler methodHandler = new MethodHandler(
712                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
713    
714                            Address address = (Address)getDeserializedObject(
715                                    masterAddressString);
716    
717                            ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
718                                    methodHandler, address);
719    
720                            clusterRequest.setBeanIdentifier(_beanIdentifier);
721    
722                            try {
723                                    ClusterExecutorUtil.execute(
724                                            clusterRequest,
725                                            new MemorySchedulerClusterResponseCallback(address), 20,
726                                            TimeUnit.SECONDS);
727    
728                                    return;
729                            }
730                            catch (Exception e) {
731                                    throw new SchedulerException(
732                                            "Unable to load scheduled jobs from cluster node " +
733                                                    address.getDescription(),
734                                            e);
735                            }
736                    }
737    
738                    List<SchedulerResponse> schedulerResponses = callMaster(
739                            masterAddressString, _getScheduledJobsMethodKey3,
740                            StorageType.MEMORY_CLUSTERED);
741    
742                    _doMasterToSlave(schedulerResponses);
743            }
744    
745            protected void removeMemoryClusteredJobs(String groupName) {
746                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
747                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
748    
749                    Iterator
750                            <Map.Entry<String,
751                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
752                                            memoryClusteredJobs.iterator();
753    
754                    while (itr.hasNext()) {
755                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
756                                    entry = itr.next();
757    
758                            ObjectValuePair<SchedulerResponse, TriggerState>
759                                    memoryClusteredJob = entry.getValue();
760    
761                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
762    
763                            if (groupName.equals(schedulerResponse.getGroupName())) {
764                                    itr.remove();
765                            }
766                    }
767            }
768    
769            protected void skipClusterInvoking(String groupName)
770                    throws SchedulerException {
771    
772                    StorageType storageType = getStorageType(groupName);
773    
774                    if (storageType.equals(StorageType.PERSISTED) ||
775                            PluginContextLifecycleThreadLocal.isDestroying()) {
776    
777                            SchedulerException schedulerException = new SchedulerException();
778    
779                            schedulerException.setSwallowable(true);
780    
781                            throw schedulerException;
782                    }
783            }
784    
785            protected void slaveToMaster() throws SchedulerException {
786                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
787    
788                    ProxyModeThreadLocal.setForceSync(true);
789    
790                    _writeLock.lock();
791    
792                    try {
793                            for (ObjectValuePair<SchedulerResponse, TriggerState>
794                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
795    
796                                    SchedulerResponse schedulerResponse =
797                                            memoryClusteredJob.getKey();
798    
799                                    _schedulerEngine.schedule(
800                                            schedulerResponse.getTrigger(),
801                                            schedulerResponse.getDescription(),
802                                            schedulerResponse.getDestinationName(),
803                                            schedulerResponse.getMessage());
804    
805                                    TriggerState triggerState = memoryClusteredJob.getValue();
806    
807                                    if (triggerState.equals(TriggerState.PAUSED)) {
808                                            _schedulerEngine.pause(
809                                                    schedulerResponse.getJobName(),
810                                                    schedulerResponse.getGroupName());
811                                    }
812                            }
813    
814                            _memoryClusteredJobs.clear();
815                    }
816                    finally {
817                            ProxyModeThreadLocal.setForceSync(forceSync);
818    
819                            _master = true;
820    
821                            _writeLock.unlock();
822                    }
823            }
824    
825            protected void updateMemoryClusteredJob(
826                    String jobName, String groupName, TriggerState triggerState) {
827    
828                    ObjectValuePair<SchedulerResponse, TriggerState>
829                            memoryClusteredJob = _memoryClusteredJobs.get(
830                                    getFullName(jobName, groupName));
831    
832                    if (memoryClusteredJob != null) {
833                            memoryClusteredJob.setValue(triggerState);
834                    }
835            }
836    
837            protected void updateMemoryClusteredJobs(
838                    String groupName, TriggerState triggerState) {
839    
840                    for (ObjectValuePair<SchedulerResponse, TriggerState>
841                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
842    
843                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
844    
845                            if (groupName.equals(schedulerResponse.getGroupName())) {
846                                    memoryClusteredJob.setValue(triggerState);
847                            }
848                    }
849            }
850    
851            @BeanReference(
852                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
853            protected SchedulerEngine schedulerEngine;
854    
855            private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
856                    throws SchedulerException {
857    
858                    _writeLock.lock();
859    
860                    try {
861                            for (SchedulerResponse schedulerResponse :
862                                            _schedulerEngine.getScheduledJobs()) {
863    
864                                    if (StorageType.MEMORY_CLUSTERED ==
865                                                    schedulerResponse.getStorageType()) {
866    
867                                            String groupName = StorageType.MEMORY_CLUSTERED.toString();
868    
869                                            groupName = groupName.concat(StringPool.POUND).concat(
870                                                    schedulerResponse.getGroupName());
871    
872                                            _schedulerEngine.delete(
873                                                    schedulerResponse.getJobName(), groupName);
874                                    }
875                            }
876    
877                            initMemoryClusteredJobs(schedulerResponses);
878    
879                            if (_log.isInfoEnabled()) {
880                                    _log.info("Switched current node from master to slave");
881                            }
882                    }
883                    catch (Exception e) {
884                            throw new SchedulerException(e);
885                    }
886                    finally {
887                            _master = false;
888    
889                            _writeLock.unlock();
890                    }
891            }
892    
893            private static final String _LOCK_CLASS_NAME =
894                    SchedulerEngine.class.getName();
895    
896            private static Log _log = LogFactoryUtil.getLog(
897                    ClusterSchedulerEngine.class);
898    
899            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
900                    SchedulerEngine.class.getName(), "getScheduledJob", String.class,
901                    String.class);
902            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
903                    SchedulerEngine.class.getName(), "getScheduledJobs");
904            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
905                    SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
906            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
907                    SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs",
908                    StorageType.class);
909    
910            private String _beanIdentifier;
911            private ClusterEventListener _clusterEventListener;
912            private volatile String _localClusterNodeAddress;
913            private volatile boolean _master;
914            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
915                    _memoryClusteredJobs = new ConcurrentHashMap
916                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
917            private java.util.concurrent.locks.Lock _readLock;
918            private SchedulerEngine _schedulerEngine;
919            private java.util.concurrent.locks.Lock _writeLock;
920    
921            private class MemorySchedulerClusterEventListener
922                    implements ClusterEventListener {
923    
924                    public void processClusterEvent(ClusterEvent clusterEvent) {
925                            try {
926                                    getMasterAddressString(true);
927                            }
928                            catch (Exception e) {
929                                    _log.error("Unable to update memory scheduler cluster lock", e);
930                            }
931                    }
932    
933            }
934    
935            private class MemorySchedulerClusterResponseCallback
936                    extends BaseClusterResponseCallback {
937    
938                    public MemorySchedulerClusterResponseCallback(Address address) {
939                            _address = address;
940                    }
941    
942                    @Override
943                    public void callback(ClusterNodeResponses clusterNodeResponses) {
944                            try {
945                                    ClusterNodeResponse clusterNodeResponse =
946                                            clusterNodeResponses.getClusterResponse(_address);
947    
948                                    List<SchedulerResponse> schedulerResponses =
949                                            (List<SchedulerResponse>)clusterNodeResponse.getResult();
950    
951                                    _doMasterToSlave(schedulerResponses);
952                            }
953                            catch (Exception e) {
954                                    _log.error(
955                                            "Unable to load memory clustered jobs from cluster node " +
956                                                    _address.getDescription(),
957                                            e);
958                            }
959                    }
960    
961                    @Override
962                    public void processTimeoutException(TimeoutException timeoutException) {
963                            _log.error(
964                                    "Unable to load memory clustered jobs from cluster node " +
965                                            _address.getDescription(),
966                                    timeoutException);
967                    }
968    
969                    private Address _address;
970    
971            }
972    
973    }