001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018    import com.liferay.portal.kernel.bean.BeanReference;
019    import com.liferay.portal.kernel.bean.IdentifiableBean;
020    import com.liferay.portal.kernel.cluster.Address;
021    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
022    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023    import com.liferay.portal.kernel.cluster.ClusterEvent;
024    import com.liferay.portal.kernel.cluster.ClusterEventListener;
025    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
028    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
029    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
030    import com.liferay.portal.kernel.cluster.ClusterRequest;
031    import com.liferay.portal.kernel.cluster.Clusterable;
032    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.Message;
036    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
037    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
038    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
039    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
040    import com.liferay.portal.kernel.scheduler.SchedulerException;
041    import com.liferay.portal.kernel.scheduler.StorageType;
042    import com.liferay.portal.kernel.scheduler.Trigger;
043    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
044    import com.liferay.portal.kernel.scheduler.TriggerState;
045    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
046    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
047    import com.liferay.portal.kernel.util.CharPool;
048    import com.liferay.portal.kernel.util.MethodHandler;
049    import com.liferay.portal.kernel.util.MethodKey;
050    import com.liferay.portal.kernel.util.ObjectValuePair;
051    import com.liferay.portal.kernel.util.StringPool;
052    import com.liferay.portal.model.Lock;
053    import com.liferay.portal.service.LockLocalServiceUtil;
054    import com.liferay.portal.util.PropsValues;
055    
056    import java.io.Serializable;
057    
058    import java.util.Iterator;
059    import java.util.List;
060    import java.util.Map;
061    import java.util.Set;
062    import java.util.concurrent.ConcurrentHashMap;
063    import java.util.concurrent.TimeUnit;
064    import java.util.concurrent.TimeoutException;
065    import java.util.concurrent.locks.ReadWriteLock;
066    import java.util.concurrent.locks.ReentrantReadWriteLock;
067    
068    /**
069     * @author Tina Tian
070     */
071    public class ClusterSchedulerEngine
072            implements IdentifiableBean, SchedulerEngine,
073                               SchedulerEngineClusterManager {
074    
075            public static SchedulerEngine createClusterSchedulerEngine(
076                    SchedulerEngine schedulerEngine) {
077    
078                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
079                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
080                    }
081    
082                    return schedulerEngine;
083            }
084    
085            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
086                    _schedulerEngine = schedulerEngine;
087            }
088    
089            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
090            @Override
091            public void delete(String groupName) throws SchedulerException {
092                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
093    
094                    _readLock.lock();
095    
096                    try {
097                            if (memoryClusteredSlaveJob) {
098                                    removeMemoryClusteredJobs(groupName);
099                            }
100                            else {
101                                    _schedulerEngine.delete(groupName);
102                            }
103                    }
104                    finally {
105                            _readLock.unlock();
106                    }
107    
108                    setClusterableThreadLocal(groupName);
109            }
110    
111            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
112            @Override
113            public void delete(String jobName, String groupName)
114                    throws SchedulerException {
115    
116                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
117    
118                    _readLock.lock();
119    
120                    try {
121                            if (memoryClusteredSlaveJob) {
122                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
123                            }
124                            else {
125                                    _schedulerEngine.delete(jobName, groupName);
126                            }
127                    }
128                    finally {
129                            _readLock.unlock();
130                    }
131    
132                    setClusterableThreadLocal(groupName);
133            }
134    
135            @Override
136            public String getBeanIdentifier() {
137                    return _beanIdentifier;
138            }
139    
140            @Override
141            public SchedulerResponse getScheduledJob(String jobName, String groupName)
142                    throws SchedulerException {
143    
144                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
145                            groupName);
146    
147                    StorageType storageType = objectValuePair.getValue();
148    
149                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
150                            String masterAddressString = getMasterAddressString(false);
151    
152                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
153                                    return callMaster(
154                                            masterAddressString, _getScheduledJobMethodKey, jobName,
155                                            objectValuePair.getKey(), storageType);
156                            }
157                    }
158    
159                    _readLock.lock();
160    
161                    try {
162                            return _schedulerEngine.getScheduledJob(jobName, groupName);
163                    }
164                    finally {
165                            _readLock.unlock();
166                    }
167            }
168    
169            @Override
170            public List<SchedulerResponse> getScheduledJobs()
171                    throws SchedulerException {
172    
173                    String masterAddressString = getMasterAddressString(false);
174    
175                    if (!_localClusterNodeAddress.equals(masterAddressString)) {
176                            return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
177                    }
178    
179                    _readLock.lock();
180    
181                    try {
182                            return _schedulerEngine.getScheduledJobs();
183                    }
184                    finally {
185                            _readLock.unlock();
186                    }
187            }
188    
189            @Override
190            public List<SchedulerResponse> getScheduledJobs(String groupName)
191                    throws SchedulerException {
192    
193                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
194                            groupName);
195    
196                    StorageType storageType = objectValuePair.getValue();
197    
198                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
199                            String masterAddressString = getMasterAddressString(false);
200    
201                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
202                                    return callMaster(
203                                            masterAddressString, _getScheduledJobsMethodKey2,
204                                            objectValuePair.getKey(), storageType);
205                            }
206                    }
207    
208                    _readLock.lock();
209    
210                    try {
211                            return _schedulerEngine.getScheduledJobs(groupName);
212                    }
213                    finally {
214                            _readLock.unlock();
215                    }
216            }
217    
218            @Override
219            public void initialize() throws SchedulerException {
220                    try {
221                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
222    
223                            _readLock = readWriteLock.readLock();
224                            _writeLock = readWriteLock.writeLock();
225    
226                            _localClusterNodeAddress = AddressSerializerUtil.serialize(
227                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
228    
229                            _clusterEventListener = new MemorySchedulerClusterEventListener();
230    
231                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
232    
233                            String masterAddressString = getMasterAddressString(false);
234    
235                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
236                                    List<SchedulerResponse> schedulerResponses = callMaster(
237                                            masterAddressString, _getScheduledJobsMethodKey3,
238                                            StorageType.MEMORY_CLUSTERED);
239    
240                                    initMemoryClusteredJobs(schedulerResponses);
241                            }
242                    }
243                    catch (Exception e) {
244                            throw new SchedulerException("Unable to initialize scheduler", e);
245                    }
246            }
247    
248            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
249            @Override
250            public void pause(String groupName) throws SchedulerException {
251                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
252    
253                    _readLock.lock();
254    
255                    try {
256                            if (memoryClusteredSlaveJob) {
257                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
258                            }
259                            else {
260                                    _schedulerEngine.pause(groupName);
261                            }
262                    }
263                    finally {
264                            _readLock.unlock();
265                    }
266    
267                    setClusterableThreadLocal(groupName);
268            }
269    
270            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
271            @Override
272            public void pause(String jobName, String groupName)
273                    throws SchedulerException {
274    
275                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
276    
277                    _readLock.lock();
278    
279                    try {
280                            if (memoryClusteredSlaveJob) {
281                                    updateMemoryClusteredJob(
282                                            jobName, groupName, TriggerState.PAUSED);
283                            }
284                            else {
285                                    _schedulerEngine.pause(jobName, groupName);
286                            }
287                    }
288                    finally {
289                            _readLock.unlock();
290                    }
291    
292                    setClusterableThreadLocal(groupName);
293            }
294    
295            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
296            @Override
297            public void resume(String groupName) throws SchedulerException {
298                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
299    
300                    _readLock.lock();
301    
302                    try {
303                            if (memoryClusteredSlaveJob) {
304                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
305                            }
306                            else {
307                                    _schedulerEngine.resume(groupName);
308                            }
309                    }
310                    finally {
311                            _readLock.unlock();
312                    }
313    
314                    setClusterableThreadLocal(groupName);
315            }
316    
317            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
318            @Override
319            public void resume(String jobName, String groupName)
320                    throws SchedulerException {
321    
322                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
323    
324                    _readLock.lock();
325    
326                    try {
327                            if (memoryClusteredSlaveJob) {
328                                    updateMemoryClusteredJob(
329                                            jobName, groupName, TriggerState.NORMAL);
330                            }
331                            else {
332                                    _schedulerEngine.resume(jobName, groupName);
333                            }
334                    }
335                    finally {
336                            _readLock.unlock();
337                    }
338    
339                    setClusterableThreadLocal(groupName);
340            }
341    
342            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
343            @Override
344            public void schedule(
345                            Trigger trigger, String description, String destinationName,
346                            Message message)
347                    throws SchedulerException {
348    
349                    String groupName = trigger.getGroupName();
350                    String jobName = trigger.getJobName();
351    
352                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
353    
354                    _readLock.lock();
355    
356                    try {
357                            if (memoryClusteredSlaveJob) {
358                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
359    
360                                    schedulerResponse.setDescription(description);
361                                    schedulerResponse.setDestinationName(destinationName);
362                                    schedulerResponse.setGroupName(groupName);
363                                    schedulerResponse.setJobName(jobName);
364                                    schedulerResponse.setMessage(message);
365                                    schedulerResponse.setTrigger(trigger);
366    
367                                    _memoryClusteredJobs.put(
368                                            getFullName(jobName, groupName),
369                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
370                                                    schedulerResponse, TriggerState.NORMAL));
371                            }
372                            else {
373                                    _schedulerEngine.schedule(
374                                            trigger, description, destinationName, message);
375                            }
376                    }
377                    finally {
378                            _readLock.unlock();
379                    }
380    
381                    setClusterableThreadLocal(groupName);
382            }
383    
384            @Override
385            public void setBeanIdentifier(String beanIdentifier) {
386                    _beanIdentifier = beanIdentifier;
387            }
388    
389            @Override
390            public void shutdown() throws SchedulerException {
391                    _portalReady = false;
392    
393                    try {
394                            ClusterExecutorUtil.removeClusterEventListener(
395                                    _clusterEventListener);
396    
397                            LockLocalServiceUtil.unlock(
398                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
399                    }
400                    catch (Exception e) {
401                            throw new SchedulerException("Unable to shutdown scheduler", e);
402                    }
403    
404                    _schedulerEngine.shutdown();
405            }
406    
407            @Override
408            public void start() throws SchedulerException {
409                    _schedulerEngine.start();
410    
411                    _portalReady = true;
412            }
413    
414            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
415            @Override
416            public void suppressError(String jobName, String groupName)
417                    throws SchedulerException {
418    
419                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
420    
421                    if (!memoryClusteredSlaveJob) {
422                            _readLock.lock();
423    
424                            try {
425                                    _schedulerEngine.suppressError(jobName, groupName);
426                            }
427                            finally {
428                                    _readLock.unlock();
429                            }
430                    }
431    
432                    setClusterableThreadLocal(groupName);
433            }
434    
435            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
436            @Override
437            public void unschedule(String groupName) throws SchedulerException {
438                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
439    
440                    _readLock.lock();
441    
442                    try {
443                            if (memoryClusteredSlaveJob) {
444                                    removeMemoryClusteredJobs(groupName);
445                            }
446                            else {
447                                    _schedulerEngine.unschedule(groupName);
448                            }
449                    }
450                    finally {
451                            _readLock.unlock();
452                    }
453    
454                    setClusterableThreadLocal(groupName);
455            }
456    
457            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
458            @Override
459            public void unschedule(String jobName, String groupName)
460                    throws SchedulerException {
461    
462                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
463    
464                    _readLock.lock();
465    
466                    try {
467                            if (memoryClusteredSlaveJob) {
468                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
469                            }
470                            else {
471                                    _schedulerEngine.unschedule(jobName, groupName);
472                            }
473                    }
474                    finally {
475                            _readLock.unlock();
476                    }
477    
478                    setClusterableThreadLocal(groupName);
479            }
480    
481            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
482            @Override
483            public void update(Trigger trigger) throws SchedulerException {
484                    String jobName = trigger.getJobName();
485                    String groupName = trigger.getGroupName();
486    
487                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
488    
489                    _readLock.lock();
490    
491                    try {
492                            if (memoryClusteredSlaveJob) {
493                                    boolean updated = false;
494    
495                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
496                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
497    
498                                            SchedulerResponse schedulerResponse =
499                                                    memoryClusteredJob.getKey();
500    
501                                            if (jobName.equals(schedulerResponse.getJobName()) &&
502                                                    groupName.equals(schedulerResponse.getGroupName())) {
503    
504                                                    schedulerResponse.setTrigger(trigger);
505    
506                                                    updated = true;
507    
508                                                    break;
509                                            }
510                                    }
511    
512                                    if (!updated) {
513                                            throw new SchedulerException(
514                                                    "Unable to update trigger for memory clustered job");
515                                    }
516                            }
517                            else {
518                                    _schedulerEngine.update(trigger);
519                            }
520                    }
521                    finally {
522                            _readLock.unlock();
523                    }
524    
525                    setClusterableThreadLocal(groupName);
526            }
527    
528            @Override
529            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
530                    getMasterAddressString(false);
531    
532                    return null;
533            }
534    
535            protected <T> T callMaster(
536                            String masterAddressString, MethodKey methodKey,
537                            Object... arguments)
538                    throws SchedulerException {
539    
540                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
541    
542                    Address address = AddressSerializerUtil.deserialize(
543                            masterAddressString);
544    
545                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
546                            methodHandler, address);
547    
548                    try {
549                            FutureClusterResponses futureClusterResponses =
550                                    ClusterExecutorUtil.execute(clusterRequest);
551    
552                            ClusterNodeResponses clusterNodeResponses =
553                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
554    
555                            ClusterNodeResponse clusterNodeResponse =
556                                    clusterNodeResponses.getClusterResponse(address);
557    
558                            return (T)clusterNodeResponse.getResult();
559                    }
560                    catch (Exception e) {
561                            throw new SchedulerException(
562                                    "Unable to load scheduled jobs from cluster node " +
563                                            address.getDescription(),
564                                    e);
565                    }
566            }
567    
568            protected String getFullName(String jobName, String groupName) {
569                    return groupName.concat(StringPool.PERIOD).concat(jobName);
570            }
571    
572            protected String getMasterAddressString(boolean asynchronous)
573                    throws SchedulerException {
574    
575                    String owner = null;
576    
577                    while (true) {
578                            try {
579                                    Lock lock = null;
580    
581                                    if (owner == null) {
582                                            lock = LockLocalServiceUtil.lock(
583                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
584                                                    _localClusterNodeAddress);
585                                    }
586                                    else {
587                                            lock = LockLocalServiceUtil.lock(
588                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
589                                                    _localClusterNodeAddress);
590                                    }
591    
592                                    owner = lock.getOwner();
593    
594                                    Address address = AddressSerializerUtil.deserialize(owner);
595    
596                                    if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
597                                            break;
598                                    }
599                            }
600                            catch (Exception e) {
601                                    if (_log.isWarnEnabled()) {
602                                            _log.warn(
603                                                    "Unable to obtain memory scheduler cluster lock. " +
604                                                            "Trying again.");
605                                    }
606                            }
607                    }
608    
609                    boolean master = _localClusterNodeAddress.equals(owner);
610    
611                    if (master == _master) {
612                            return owner;
613                    }
614    
615                    if (master) {
616                            slaveToMaster();
617                    }
618                    else {
619                            masterToSlave(owner, asynchronous);
620                    }
621    
622                    return owner;
623            }
624    
625            protected void initMemoryClusteredJobs(
626                            List<SchedulerResponse> schedulerResponses)
627                    throws Exception {
628    
629                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
630                            Trigger oldTrigger = schedulerResponse.getTrigger();
631    
632                            String jobName = schedulerResponse.getJobName();
633                            String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
634                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
635    
636                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
637                                    oldTrigger.getTriggerType(), jobName, groupName,
638                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
639                                    oldTrigger.getTriggerContent());
640    
641                            schedulerResponse.setTrigger(newTrigger);
642    
643                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
644                                    schedulerResponse);
645    
646                            Message message = schedulerResponse.getMessage();
647    
648                            message.remove(JOB_STATE);
649    
650                            _memoryClusteredJobs.put(
651                                    getFullName(jobName, groupName),
652                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
653                                            schedulerResponse, triggerState));
654                    }
655            }
656    
657            protected boolean isMemoryClusteredSlaveJob(String groupName)
658                    throws SchedulerException {
659    
660                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
661                            groupName);
662    
663                    StorageType storageType = objectValuePair.getValue();
664    
665                    if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
666                            return false;
667                    }
668    
669                    String masterAddressString = getMasterAddressString(false);
670    
671                    if (_localClusterNodeAddress.equals(masterAddressString)) {
672                            return false;
673                    }
674    
675                    return true;
676            }
677    
678            protected void masterToSlave(
679                            String masterAddressString, boolean asynchronous)
680                    throws SchedulerException {
681    
682                    if (asynchronous) {
683                            MethodHandler methodHandler = new MethodHandler(
684                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
685    
686                            Address address = AddressSerializerUtil.deserialize(
687                                    masterAddressString);
688    
689                            ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
690                                    methodHandler, address);
691    
692                            try {
693                                    ClusterExecutorUtil.execute(
694                                            clusterRequest,
695                                            new MemorySchedulerClusterResponseCallback(address), 20,
696                                            TimeUnit.SECONDS);
697    
698                                    return;
699                            }
700                            catch (Exception e) {
701                                    throw new SchedulerException(
702                                            "Unable to load scheduled jobs from cluster node " +
703                                                    address.getDescription(),
704                                            e);
705                            }
706                    }
707    
708                    List<SchedulerResponse> schedulerResponses = callMaster(
709                            masterAddressString, _getScheduledJobsMethodKey3,
710                            StorageType.MEMORY_CLUSTERED);
711    
712                    _doMasterToSlave(schedulerResponses);
713            }
714    
715            protected void removeMemoryClusteredJobs(String groupName) {
716                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
717                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
718    
719                    Iterator
720                            <Map.Entry<String,
721                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
722                                            memoryClusteredJobs.iterator();
723    
724                    while (itr.hasNext()) {
725                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
726                                    entry = itr.next();
727    
728                            ObjectValuePair<SchedulerResponse, TriggerState>
729                                    memoryClusteredJob = entry.getValue();
730    
731                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
732    
733                            if (groupName.equals(schedulerResponse.getGroupName())) {
734                                    itr.remove();
735                            }
736                    }
737            }
738    
739            protected ObjectValuePair<String, StorageType> resolveGroupName(
740                    String groupName) {
741    
742                    int index = groupName.indexOf(CharPool.POUND);
743    
744                    String storageTypeString = groupName.substring(0, index);
745    
746                    StorageType storageType = StorageType.valueOf(storageTypeString);
747    
748                    String orginalGroupName = groupName.substring(index + 1);
749    
750                    return new ObjectValuePair<String, StorageType>(
751                            orginalGroupName, storageType);
752            }
753    
754            protected void setClusterableThreadLocal(String groupName) {
755                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
756                            groupName);
757    
758                    ClusterableContextThreadLocal.putThreadLocalContext(
759                            STORAGE_TYPE, objectValuePair.getValue());
760                    ClusterableContextThreadLocal.putThreadLocalContext(
761                            _PORTAL_READY, _portalReady);
762    
763                    boolean pluginReady = true;
764    
765                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
766                            PluginContextLifecycleThreadLocal.isDestroying()) {
767    
768                            pluginReady = false;
769                    }
770    
771                    ClusterableContextThreadLocal.putThreadLocalContext(
772                            _PLUGIN_READY, pluginReady);
773            }
774    
775            protected void slaveToMaster() throws SchedulerException {
776                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
777    
778                    ProxyModeThreadLocal.setForceSync(true);
779    
780                    _writeLock.lock();
781    
782                    try {
783                            for (ObjectValuePair<SchedulerResponse, TriggerState>
784                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
785    
786                                    SchedulerResponse schedulerResponse =
787                                            memoryClusteredJob.getKey();
788    
789                                    _schedulerEngine.schedule(
790                                            schedulerResponse.getTrigger(),
791                                            schedulerResponse.getDescription(),
792                                            schedulerResponse.getDestinationName(),
793                                            schedulerResponse.getMessage());
794    
795                                    TriggerState triggerState = memoryClusteredJob.getValue();
796    
797                                    if (triggerState.equals(TriggerState.PAUSED)) {
798                                            _schedulerEngine.pause(
799                                                    schedulerResponse.getJobName(),
800                                                    schedulerResponse.getGroupName());
801                                    }
802                            }
803    
804                            _memoryClusteredJobs.clear();
805                    }
806                    finally {
807                            ProxyModeThreadLocal.setForceSync(forceSync);
808    
809                            _master = true;
810    
811                            _writeLock.unlock();
812                    }
813            }
814    
815            protected void updateMemoryClusteredJob(
816                    String jobName, String groupName, TriggerState triggerState) {
817    
818                    ObjectValuePair<SchedulerResponse, TriggerState>
819                            memoryClusteredJob = _memoryClusteredJobs.get(
820                                    getFullName(jobName, groupName));
821    
822                    if (memoryClusteredJob != null) {
823                            memoryClusteredJob.setValue(triggerState);
824                    }
825            }
826    
827            protected void updateMemoryClusteredJobs(
828                    String groupName, TriggerState triggerState) {
829    
830                    for (ObjectValuePair<SchedulerResponse, TriggerState>
831                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
832    
833                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
834    
835                            if (groupName.equals(schedulerResponse.getGroupName())) {
836                                    memoryClusteredJob.setValue(triggerState);
837                            }
838                    }
839            }
840    
841            @BeanReference(
842                    name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
843            protected SchedulerEngine schedulerEngine;
844    
845            private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
846                    throws SchedulerException {
847    
848                    _writeLock.lock();
849    
850                    try {
851                            for (SchedulerResponse schedulerResponse :
852                                            _schedulerEngine.getScheduledJobs()) {
853    
854                                    if (StorageType.MEMORY_CLUSTERED ==
855                                                    schedulerResponse.getStorageType()) {
856    
857                                            String groupName = StorageType.MEMORY_CLUSTERED.toString();
858    
859                                            groupName = groupName.concat(StringPool.POUND).concat(
860                                                    schedulerResponse.getGroupName());
861    
862                                            _schedulerEngine.delete(
863                                                    schedulerResponse.getJobName(), groupName);
864                                    }
865                            }
866    
867                            initMemoryClusteredJobs(schedulerResponses);
868    
869                            if (_log.isInfoEnabled()) {
870                                    _log.info("Switched current node from master to slave");
871                            }
872                    }
873                    catch (Exception e) {
874                            throw new SchedulerException(e);
875                    }
876                    finally {
877                            _master = false;
878    
879                            _writeLock.unlock();
880                    }
881            }
882    
883            private static final String _LOCK_CLASS_NAME =
884                    SchedulerEngine.class.getName();
885    
886            private static final String _PLUGIN_READY = "plugin.ready";
887    
888            private static final String _PORTAL_READY = "portal.ready";
889    
890            private static Log _log = LogFactoryUtil.getLog(
891                    ClusterSchedulerEngine.class);
892    
893            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
894                    SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
895                    String.class, StorageType.class);
896            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
897                    SchedulerEngineHelperUtil.class, "getScheduledJobs");
898            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
899                    SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
900                    StorageType.class);
901            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
902                    SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
903    
904            private String _beanIdentifier;
905            private ClusterEventListener _clusterEventListener;
906            private volatile String _localClusterNodeAddress;
907            private volatile boolean _master;
908            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
909                    _memoryClusteredJobs = new ConcurrentHashMap
910                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
911            private boolean _portalReady;
912            private java.util.concurrent.locks.Lock _readLock;
913            private SchedulerEngine _schedulerEngine;
914            private java.util.concurrent.locks.Lock _writeLock;
915    
916            private static class SchedulerClusterInvokeAcceptor
917                    implements ClusterInvokeAcceptor {
918    
919                    @Override
920                    public boolean accept(Map<String, Serializable> context) {
921                            if (ClusterInvokeThreadLocal.isEnabled()) {
922                                    return true;
923                            }
924    
925                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
926                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
927                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
928    
929                            if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
930                                    !pluginReady) {
931    
932                                    return false;
933                            }
934    
935                            return true;
936                    }
937    
938            }
939    
940            private class MemorySchedulerClusterEventListener
941                    implements ClusterEventListener {
942    
943                    @Override
944                    public void processClusterEvent(ClusterEvent clusterEvent) {
945                            try {
946                                    getMasterAddressString(true);
947                            }
948                            catch (Exception e) {
949                                    _log.error("Unable to update memory scheduler cluster lock", e);
950                            }
951                    }
952    
953            }
954    
955            private class MemorySchedulerClusterResponseCallback
956                    extends BaseClusterResponseCallback {
957    
958                    public MemorySchedulerClusterResponseCallback(Address address) {
959                            _address = address;
960                    }
961    
962                    @Override
963                    public void callback(ClusterNodeResponses clusterNodeResponses) {
964                            try {
965                                    ClusterNodeResponse clusterNodeResponse =
966                                            clusterNodeResponses.getClusterResponse(_address);
967    
968                                    List<SchedulerResponse> schedulerResponses =
969                                            (List<SchedulerResponse>)clusterNodeResponse.getResult();
970    
971                                    _doMasterToSlave(schedulerResponses);
972                            }
973                            catch (Exception e) {
974                                    _log.error(
975                                            "Unable to load memory clustered jobs from cluster node " +
976                                                    _address.getDescription(),
977                                            e);
978                            }
979                    }
980    
981                    @Override
982                    public void processTimeoutException(TimeoutException timeoutException) {
983                            _log.error(
984                                    "Unable to load memory clustered jobs from cluster node " +
985                                            _address.getDescription(),
986                                    timeoutException);
987                    }
988    
989                    private Address _address;
990    
991            }
992    
993    }