001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterInvokeThreadLocal;
018    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
019    import com.liferay.portal.kernel.bean.BeanReference;
020    import com.liferay.portal.kernel.bean.IdentifiableBean;
021    import com.liferay.portal.kernel.cluster.Address;
022    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023    import com.liferay.portal.kernel.cluster.ClusterEvent;
024    import com.liferay.portal.kernel.cluster.ClusterEventListener;
025    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
028    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
029    import com.liferay.portal.kernel.cluster.ClusterRequest;
030    import com.liferay.portal.kernel.cluster.Clusterable;
031    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
032    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
033    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
034    import com.liferay.portal.kernel.log.Log;
035    import com.liferay.portal.kernel.log.LogFactoryUtil;
036    import com.liferay.portal.kernel.messaging.Message;
037    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
038    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
039    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
040    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
041    import com.liferay.portal.kernel.scheduler.SchedulerException;
042    import com.liferay.portal.kernel.scheduler.StorageType;
043    import com.liferay.portal.kernel.scheduler.Trigger;
044    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
045    import com.liferay.portal.kernel.scheduler.TriggerState;
046    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
047    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
048    import com.liferay.portal.kernel.util.Base64;
049    import com.liferay.portal.kernel.util.CharPool;
050    import com.liferay.portal.kernel.util.MethodHandler;
051    import com.liferay.portal.kernel.util.MethodKey;
052    import com.liferay.portal.kernel.util.ObjectValuePair;
053    import com.liferay.portal.kernel.util.StringPool;
054    import com.liferay.portal.model.Lock;
055    import com.liferay.portal.service.LockLocalServiceUtil;
056    import com.liferay.portal.util.PropsValues;
057    
058    import java.io.ObjectInputStream;
059    import java.io.ObjectOutputStream;
060    import java.io.Serializable;
061    
062    import java.util.Iterator;
063    import java.util.List;
064    import java.util.Map;
065    import java.util.Set;
066    import java.util.concurrent.ConcurrentHashMap;
067    import java.util.concurrent.TimeUnit;
068    import java.util.concurrent.TimeoutException;
069    import java.util.concurrent.locks.ReadWriteLock;
070    import java.util.concurrent.locks.ReentrantReadWriteLock;
071    
072    /**
073     * @author Tina Tian
074     */
075    public class ClusterSchedulerEngine
076            implements IdentifiableBean, SchedulerEngine,
077                               SchedulerEngineClusterManager {
078    
079            public static SchedulerEngine createClusterSchedulerEngine(
080                    SchedulerEngine schedulerEngine) {
081    
082                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
083                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
084                    }
085    
086                    return schedulerEngine;
087            }
088    
089            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
090                    _schedulerEngine = schedulerEngine;
091            }
092    
093            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
094            public void delete(String groupName) throws SchedulerException {
095                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
096    
097                    _readLock.lock();
098    
099                    try {
100                            if (memoryClusteredSlaveJob) {
101                                    removeMemoryClusteredJobs(groupName);
102                            }
103                            else {
104                                    _schedulerEngine.delete(groupName);
105                            }
106                    }
107                    finally {
108                            _readLock.unlock();
109                    }
110    
111                    setClusterableThreadLocal(groupName);
112            }
113    
114            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
115            public void delete(String jobName, String groupName)
116                    throws SchedulerException {
117    
118                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
119    
120                    _readLock.lock();
121    
122                    try {
123                            if (memoryClusteredSlaveJob) {
124                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
125                            }
126                            else {
127                                    _schedulerEngine.delete(jobName, groupName);
128                            }
129                    }
130                    finally {
131                            _readLock.unlock();
132                    }
133    
134                    setClusterableThreadLocal(groupName);
135            }
136    
137            public String getBeanIdentifier() {
138                    return _beanIdentifier;
139            }
140    
141            public SchedulerResponse getScheduledJob(String jobName, String groupName)
142                    throws SchedulerException {
143    
144                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
145                            groupName);
146    
147                    StorageType storageType = objectValuePair.getValue();
148    
149                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
150                            String masterAddressString = getMasterAddressString(false);
151    
152                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
153                                    return (SchedulerResponse)callMaster(
154                                            masterAddressString, _getScheduledJobMethodKey, jobName,
155                                            objectValuePair.getKey(), storageType);
156                            }
157                    }
158    
159                    _readLock.lock();
160    
161                    try {
162                            return _schedulerEngine.getScheduledJob(jobName, groupName);
163                    }
164                    finally {
165                            _readLock.unlock();
166                    }
167            }
168    
169            public List<SchedulerResponse> getScheduledJobs()
170                    throws SchedulerException {
171    
172                    String masterAddressString = getMasterAddressString(false);
173    
174                    if (!_localClusterNodeAddress.equals(masterAddressString)) {
175                            return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
176                    }
177    
178                    _readLock.lock();
179    
180                    try {
181                            return _schedulerEngine.getScheduledJobs();
182                    }
183                    finally {
184                            _readLock.unlock();
185                    }
186            }
187    
188            public List<SchedulerResponse> getScheduledJobs(String groupName)
189                    throws SchedulerException {
190    
191                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
192                            groupName);
193    
194                    StorageType storageType = objectValuePair.getValue();
195    
196                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
197                            String masterAddressString = getMasterAddressString(false);
198    
199                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
200                                    return callMaster(
201                                            masterAddressString, _getScheduledJobsMethodKey2,
202                                            objectValuePair.getKey(), storageType);
203                            }
204                    }
205    
206                    _readLock.lock();
207    
208                    try {
209                            return _schedulerEngine.getScheduledJobs(groupName);
210                    }
211                    finally {
212                            _readLock.unlock();
213                    }
214            }
215    
216            public void initialize() throws SchedulerException {
217                    try {
218                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
219    
220                            _readLock = readWriteLock.readLock();
221                            _writeLock = readWriteLock.writeLock();
222    
223                            _localClusterNodeAddress = getSerializedString(
224                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
225    
226                            _clusterEventListener = new MemorySchedulerClusterEventListener();
227    
228                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
229    
230                            String masterAddressString = getMasterAddressString(false);
231    
232                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
233                                    List<SchedulerResponse> schedulerResponses = callMaster(
234                                            masterAddressString, _getScheduledJobsMethodKey3,
235                                            StorageType.MEMORY_CLUSTERED);
236    
237                                    initMemoryClusteredJobs(schedulerResponses);
238                            }
239                    }
240                    catch (Exception e) {
241                            throw new SchedulerException("Unable to initialize scheduler", e);
242                    }
243            }
244    
245            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
246            public void pause(String groupName) throws SchedulerException {
247                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
248    
249                    _readLock.lock();
250    
251                    try {
252                            if (memoryClusteredSlaveJob) {
253                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
254                            }
255                            else {
256                                    _schedulerEngine.pause(groupName);
257                            }
258                    }
259                    finally {
260                            _readLock.unlock();
261                    }
262    
263                    setClusterableThreadLocal(groupName);
264            }
265    
266            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
267            public void pause(String jobName, String groupName)
268                    throws SchedulerException {
269    
270                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
271    
272                    _readLock.lock();
273    
274                    try {
275                            if (memoryClusteredSlaveJob) {
276                                    updateMemoryClusteredJob(
277                                            jobName, groupName, TriggerState.PAUSED);
278                            }
279                            else {
280                                    _schedulerEngine.pause(jobName, groupName);
281                            }
282                    }
283                    finally {
284                            _readLock.unlock();
285                    }
286    
287                    setClusterableThreadLocal(groupName);
288            }
289    
290            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
291            public void resume(String groupName) throws SchedulerException {
292                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
293    
294                    _readLock.lock();
295    
296                    try {
297                            if (memoryClusteredSlaveJob) {
298                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
299                            }
300                            else {
301                                    _schedulerEngine.resume(groupName);
302                            }
303                    }
304                    finally {
305                            _readLock.unlock();
306                    }
307    
308                    setClusterableThreadLocal(groupName);
309            }
310    
311            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
312            public void resume(String jobName, String groupName)
313                    throws SchedulerException {
314    
315                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
316    
317                    _readLock.lock();
318    
319                    try {
320                            if (memoryClusteredSlaveJob) {
321                                    updateMemoryClusteredJob(
322                                            jobName, groupName, TriggerState.NORMAL);
323                            }
324                            else {
325                                    _schedulerEngine.resume(jobName, groupName);
326                            }
327                    }
328                    finally {
329                            _readLock.unlock();
330                    }
331    
332                    setClusterableThreadLocal(groupName);
333            }
334    
335            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
336            public void schedule(
337                            Trigger trigger, String description, String destinationName,
338                            Message message)
339                    throws SchedulerException {
340    
341                    String groupName = trigger.getGroupName();
342                    String jobName = trigger.getJobName();
343    
344                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
345    
346                    _readLock.lock();
347    
348                    try {
349                            if (memoryClusteredSlaveJob) {
350                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
351    
352                                    schedulerResponse.setDescription(description);
353                                    schedulerResponse.setDestinationName(destinationName);
354                                    schedulerResponse.setGroupName(groupName);
355                                    schedulerResponse.setJobName(jobName);
356                                    schedulerResponse.setMessage(message);
357                                    schedulerResponse.setTrigger(trigger);
358    
359                                    _memoryClusteredJobs.put(
360                                            getFullName(jobName, groupName),
361                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
362                                                    schedulerResponse, TriggerState.NORMAL));
363                            }
364                            else {
365                                    _schedulerEngine.schedule(
366                                            trigger, description, destinationName, message);
367                            }
368                    }
369                    finally {
370                            _readLock.unlock();
371                    }
372    
373                    setClusterableThreadLocal(groupName);
374            }
375    
376            public void setBeanIdentifier(String beanIdentifier) {
377                    _beanIdentifier = beanIdentifier;
378            }
379    
380            public void shutdown() throws SchedulerException {
381                    _portalReady = false;
382    
383                    try {
384                            ClusterExecutorUtil.removeClusterEventListener(
385                                    _clusterEventListener);
386    
387                            LockLocalServiceUtil.unlock(
388                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
389                                    PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
390                    }
391                    catch (Exception e) {
392                            throw new SchedulerException("Unable to shutdown scheduler", e);
393                    }
394    
395                    _schedulerEngine.shutdown();
396            }
397    
398            public void start() throws SchedulerException {
399                    _schedulerEngine.start();
400    
401                    _portalReady = true;
402            }
403    
404            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
405            public void suppressError(String jobName, String groupName)
406                    throws SchedulerException {
407    
408                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
409    
410                    if (!memoryClusteredSlaveJob) {
411                            _readLock.lock();
412    
413                            try {
414                                    _schedulerEngine.suppressError(jobName, groupName);
415                            }
416                            finally {
417                                    _readLock.unlock();
418                            }
419                    }
420    
421                    setClusterableThreadLocal(groupName);
422            }
423    
424            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
425            public void unschedule(String groupName) throws SchedulerException {
426                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
427    
428                    _readLock.lock();
429    
430                    try {
431                            if (memoryClusteredSlaveJob) {
432                                    removeMemoryClusteredJobs(groupName);
433                            }
434                            else {
435                                    _schedulerEngine.unschedule(groupName);
436                            }
437                    }
438                    finally {
439                            _readLock.unlock();
440                    }
441    
442                    setClusterableThreadLocal(groupName);
443            }
444    
445            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
446            public void unschedule(String jobName, String groupName)
447                    throws SchedulerException {
448    
449                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
450    
451                    _readLock.lock();
452    
453                    try {
454                            if (memoryClusteredSlaveJob) {
455                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
456                            }
457                            else {
458                                    _schedulerEngine.unschedule(jobName, groupName);
459                            }
460                    }
461                    finally {
462                            _readLock.unlock();
463                    }
464    
465                    setClusterableThreadLocal(groupName);
466            }
467    
468            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
469            public void update(Trigger trigger) throws SchedulerException {
470                    String jobName = trigger.getJobName();
471                    String groupName = trigger.getGroupName();
472    
473                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
474    
475                    _readLock.lock();
476    
477                    try {
478                            if (memoryClusteredSlaveJob) {
479                                    boolean updated = false;
480    
481                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
482                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
483    
484                                            SchedulerResponse schedulerResponse =
485                                                    memoryClusteredJob.getKey();
486    
487                                            if (jobName.equals(schedulerResponse.getJobName()) &&
488                                                    groupName.equals(schedulerResponse.getGroupName())) {
489    
490                                                    schedulerResponse.setTrigger(trigger);
491    
492                                                    updated = true;
493    
494                                                    break;
495                                            }
496                                    }
497    
498                                    if (!updated) {
499                                            throw new SchedulerException(
500                                                    "Unable to update trigger for memory clustered job");
501                                    }
502                            }
503                            else {
504                                    _schedulerEngine.update(trigger);
505                            }
506                    }
507                    finally {
508                            _readLock.unlock();
509                    }
510    
511                    setClusterableThreadLocal(groupName);
512            }
513    
514            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
515                    getMasterAddressString(false);
516    
517                    return null;
518            }
519    
520            protected <T> T callMaster(
521                            String masterAddressString, MethodKey methodKey,
522                            Object... arguments)
523                    throws SchedulerException {
524    
525                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
526    
527                    Address address = (Address)getDeserializedObject(masterAddressString);
528    
529                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
530                            methodHandler, address);
531    
532                    try {
533                            FutureClusterResponses futureClusterResponses =
534                                    ClusterExecutorUtil.execute(clusterRequest);
535    
536                            ClusterNodeResponses clusterNodeResponses =
537                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
538    
539                            ClusterNodeResponse clusterNodeResponse =
540                                    clusterNodeResponses.getClusterResponse(address);
541    
542                            return (T)clusterNodeResponse.getResult();
543                    }
544                    catch (Exception e) {
545                            throw new SchedulerException(
546                                    "Unable to load scheduled jobs from cluster node " +
547                                            address.getDescription(),
548                                    e);
549                    }
550            }
551    
552            protected Object getDeserializedObject(String string)
553                    throws SchedulerException {
554    
555                    byte[] bytes = Base64.decode(string);
556    
557                    UnsyncByteArrayInputStream byteArrayInputStream =
558                            new UnsyncByteArrayInputStream(bytes);
559    
560                    ObjectInputStream objectInputStream = null;
561    
562                    try {
563                            objectInputStream = new ObjectInputStream(byteArrayInputStream);
564    
565                            Object object = objectInputStream.readObject();
566    
567                            return object;
568                    }
569                    catch (Exception e) {
570                            throw new SchedulerException(
571                                    "Unable to deserialize object from " + string, e);
572                    }
573                    finally {
574                            try {
575                                    objectInputStream.close();
576                            }
577                            catch (Exception e) {
578                            }
579                    }
580            }
581    
582            protected String getFullName(String jobName, String groupName) {
583                    return groupName.concat(StringPool.PERIOD).concat(jobName);
584            }
585    
586            protected String getMasterAddressString(boolean asynchronous)
587                    throws SchedulerException {
588    
589                    String owner = null;
590    
591                    Lock lock = null;
592    
593                    while (true) {
594                            try {
595                                    if (owner == null) {
596                                            lock = LockLocalServiceUtil.lock(
597                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
598                                                    _localClusterNodeAddress,
599                                                    PropsValues.
600                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
601                                    }
602                                    else {
603                                            lock = LockLocalServiceUtil.lock(
604                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
605                                                    _localClusterNodeAddress,
606                                                    PropsValues.
607                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
608                                    }
609    
610                                    Address address = (Address)getDeserializedObject(
611                                            lock.getOwner());
612    
613                                    if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
614                                            break;
615                                    }
616                                    else {
617                                            owner = lock.getOwner();
618                                    }
619                            }
620                            catch (Exception e) {
621                                    if (_log.isWarnEnabled()) {
622                                            _log.warn(
623                                                    "Unable to obtain memory scheduler cluster lock. " +
624                                                            "Trying again.");
625                                    }
626                            }
627                    }
628    
629                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
630    
631                    if (master == _master) {
632                            return lock.getOwner();
633                    }
634    
635                    if (master) {
636                            slaveToMaster();
637                    }
638                    else {
639                            masterToSlave(lock.getOwner(), asynchronous);
640                    }
641    
642                    return lock.getOwner();
643            }
644    
645            protected String getSerializedString(Object object) throws Exception {
646                    UnsyncByteArrayOutputStream byteArrayOutputStream =
647                            new UnsyncByteArrayOutputStream();
648    
649                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
650                            byteArrayOutputStream);
651    
652                    objectOutputStream.writeObject(object);
653                    objectOutputStream.close();
654    
655                    byte[] bytes = byteArrayOutputStream.toByteArray();
656    
657                    return Base64.encode(bytes);
658            }
659    
660            protected void initMemoryClusteredJobs(
661                            List<SchedulerResponse> schedulerResponses)
662                    throws Exception {
663    
664                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
665                            Trigger oldTrigger = schedulerResponse.getTrigger();
666    
667                            String jobName = schedulerResponse.getJobName();
668                            String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
669                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
670    
671                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
672                                    oldTrigger.getTriggerType(), jobName, groupName,
673                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
674                                    oldTrigger.getTriggerContent());
675    
676                            schedulerResponse.setTrigger(newTrigger);
677    
678                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
679                                    schedulerResponse);
680    
681                            Message message = schedulerResponse.getMessage();
682    
683                            message.remove(JOB_STATE);
684    
685                            _memoryClusteredJobs.put(
686                                    getFullName(jobName, groupName),
687                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
688                                            schedulerResponse, triggerState));
689                    }
690            }
691    
692            protected boolean isMemoryClusteredSlaveJob(String groupName)
693                    throws SchedulerException {
694    
695                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
696                            groupName);
697    
698                    StorageType storageType = objectValuePair.getValue();
699    
700                    if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
701                            return false;
702                    }
703    
704                    String masterAddressString = getMasterAddressString(false);
705    
706                    if (_localClusterNodeAddress.equals(masterAddressString)) {
707                            return false;
708                    }
709    
710                    return true;
711            }
712    
713            protected void masterToSlave(
714                            String masterAddressString, boolean asynchronous)
715                    throws SchedulerException {
716    
717                    if (asynchronous) {
718                            MethodHandler methodHandler = new MethodHandler(
719                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
720    
721                            Address address = (Address)getDeserializedObject(
722                                    masterAddressString);
723    
724                            ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
725                                    methodHandler, address);
726    
727                            try {
728                                    ClusterExecutorUtil.execute(
729                                            clusterRequest,
730                                            new MemorySchedulerClusterResponseCallback(address), 20,
731                                            TimeUnit.SECONDS);
732    
733                                    return;
734                            }
735                            catch (Exception e) {
736                                    throw new SchedulerException(
737                                            "Unable to load scheduled jobs from cluster node " +
738                                                    address.getDescription(),
739                                            e);
740                            }
741                    }
742    
743                    List<SchedulerResponse> schedulerResponses = callMaster(
744                            masterAddressString, _getScheduledJobsMethodKey3,
745                            StorageType.MEMORY_CLUSTERED);
746    
747                    _doMasterToSlave(schedulerResponses);
748            }
749    
750            protected void removeMemoryClusteredJobs(String groupName) {
751                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
752                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
753    
754                    Iterator
755                            <Map.Entry<String,
756                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
757                                            memoryClusteredJobs.iterator();
758    
759                    while (itr.hasNext()) {
760                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
761                                    entry = itr.next();
762    
763                            ObjectValuePair<SchedulerResponse, TriggerState>
764                                    memoryClusteredJob = entry.getValue();
765    
766                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
767    
768                            if (groupName.equals(schedulerResponse.getGroupName())) {
769                                    itr.remove();
770                            }
771                    }
772            }
773    
774            protected ObjectValuePair<String, StorageType> resolveGroupName(
775                    String groupName) {
776    
777                    int index = groupName.indexOf(CharPool.POUND);
778    
779                    String storageTypeString = groupName.substring(0, index);
780    
781                    StorageType storageType = StorageType.valueOf(storageTypeString);
782    
783                    String orginalGroupName = groupName.substring(index + 1);
784    
785                    return new ObjectValuePair<String, StorageType>(
786                            orginalGroupName, storageType);
787            }
788    
789            protected void setClusterableThreadLocal(String groupName) {
790                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
791                            groupName);
792    
793                    ClusterableContextThreadLocal.putThreadLocalContext(
794                            STORAGE_TYPE, objectValuePair.getValue());
795                    ClusterableContextThreadLocal.putThreadLocalContext(
796                            _PORTAL_READY, _portalReady);
797    
798                    boolean pluginReady = true;
799    
800                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
801                            PluginContextLifecycleThreadLocal.isDestroying()) {
802    
803                            pluginReady = false;
804                    }
805    
806                    ClusterableContextThreadLocal.putThreadLocalContext(
807                            _PLUGIN_READY, pluginReady);
808            }
809    
810            protected void slaveToMaster() throws SchedulerException {
811                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
812    
813                    ProxyModeThreadLocal.setForceSync(true);
814    
815                    _writeLock.lock();
816    
817                    try {
818                            for (ObjectValuePair<SchedulerResponse, TriggerState>
819                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
820    
821                                    SchedulerResponse schedulerResponse =
822                                            memoryClusteredJob.getKey();
823    
824                                    _schedulerEngine.schedule(
825                                            schedulerResponse.getTrigger(),
826                                            schedulerResponse.getDescription(),
827                                            schedulerResponse.getDestinationName(),
828                                            schedulerResponse.getMessage());
829    
830                                    TriggerState triggerState = memoryClusteredJob.getValue();
831    
832                                    if (triggerState.equals(TriggerState.PAUSED)) {
833                                            _schedulerEngine.pause(
834                                                    schedulerResponse.getJobName(),
835                                                    schedulerResponse.getGroupName());
836                                    }
837                            }
838    
839                            _memoryClusteredJobs.clear();
840                    }
841                    finally {
842                            ProxyModeThreadLocal.setForceSync(forceSync);
843    
844                            _master = true;
845    
846                            _writeLock.unlock();
847                    }
848            }
849    
850            protected void updateMemoryClusteredJob(
851                    String jobName, String groupName, TriggerState triggerState) {
852    
853                    ObjectValuePair<SchedulerResponse, TriggerState>
854                            memoryClusteredJob = _memoryClusteredJobs.get(
855                                    getFullName(jobName, groupName));
856    
857                    if (memoryClusteredJob != null) {
858                            memoryClusteredJob.setValue(triggerState);
859                    }
860            }
861    
862            protected void updateMemoryClusteredJobs(
863                    String groupName, TriggerState triggerState) {
864    
865                    for (ObjectValuePair<SchedulerResponse, TriggerState>
866                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
867    
868                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
869    
870                            if (groupName.equals(schedulerResponse.getGroupName())) {
871                                    memoryClusteredJob.setValue(triggerState);
872                            }
873                    }
874            }
875    
876            @BeanReference(
877                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
878            protected SchedulerEngine schedulerEngine;
879    
880            private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
881                    throws SchedulerException {
882    
883                    _writeLock.lock();
884    
885                    try {
886                            for (SchedulerResponse schedulerResponse :
887                                            _schedulerEngine.getScheduledJobs()) {
888    
889                                    if (StorageType.MEMORY_CLUSTERED ==
890                                                    schedulerResponse.getStorageType()) {
891    
892                                            String groupName = StorageType.MEMORY_CLUSTERED.toString();
893    
894                                            groupName = groupName.concat(StringPool.POUND).concat(
895                                                    schedulerResponse.getGroupName());
896    
897                                            _schedulerEngine.delete(
898                                                    schedulerResponse.getJobName(), groupName);
899                                    }
900                            }
901    
902                            initMemoryClusteredJobs(schedulerResponses);
903    
904                            if (_log.isInfoEnabled()) {
905                                    _log.info("Switched current node from master to slave");
906                            }
907                    }
908                    catch (Exception e) {
909                            throw new SchedulerException(e);
910                    }
911                    finally {
912                            _master = false;
913    
914                            _writeLock.unlock();
915                    }
916            }
917    
918            private static final String _LOCK_CLASS_NAME =
919                    SchedulerEngine.class.getName();
920    
921            private static final String _PLUGIN_READY = "plugin.ready";
922    
923            private static final String _PORTAL_READY = "portal.ready";
924    
925            private static Log _log = LogFactoryUtil.getLog(
926                    ClusterSchedulerEngine.class);
927    
928            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
929                    SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
930                    String.class, StorageType.class);
931            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
932                    SchedulerEngineHelperUtil.class, "getScheduledJobs");
933            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
934                    SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
935                    StorageType.class);
936            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
937                    SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
938    
939            private String _beanIdentifier;
940            private ClusterEventListener _clusterEventListener;
941            private volatile String _localClusterNodeAddress;
942            private volatile boolean _master;
943            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
944                    _memoryClusteredJobs = new ConcurrentHashMap
945                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
946            private boolean _portalReady;
947            private java.util.concurrent.locks.Lock _readLock;
948            private SchedulerEngine _schedulerEngine;
949            private java.util.concurrent.locks.Lock _writeLock;
950    
951            private static class SchedulerClusterInvokeAcceptor
952                    implements ClusterInvokeAcceptor {
953    
954                    public boolean accept(Map<String, Serializable> context) {
955                            if (ClusterInvokeThreadLocal.isEnabled()) {
956                                    return true;
957                            }
958    
959                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
960                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
961                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
962    
963                            if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
964                                    !pluginReady) {
965    
966                                    return false;
967                            }
968    
969                            return true;
970                    }
971    
972            }
973    
974            private class MemorySchedulerClusterEventListener
975                    implements ClusterEventListener {
976    
977                    public void processClusterEvent(ClusterEvent clusterEvent) {
978                            try {
979                                    getMasterAddressString(true);
980                            }
981                            catch (Exception e) {
982                                    _log.error("Unable to update memory scheduler cluster lock", e);
983                            }
984                    }
985    
986            }
987    
988            private class MemorySchedulerClusterResponseCallback
989                    extends BaseClusterResponseCallback {
990    
991                    public MemorySchedulerClusterResponseCallback(Address address) {
992                            _address = address;
993                    }
994    
995                    @Override
996                    public void callback(ClusterNodeResponses clusterNodeResponses) {
997                            try {
998                                    ClusterNodeResponse clusterNodeResponse =
999                                            clusterNodeResponses.getClusterResponse(_address);
1000    
1001                                    List<SchedulerResponse> schedulerResponses =
1002                                            (List<SchedulerResponse>)clusterNodeResponse.getResult();
1003    
1004                                    _doMasterToSlave(schedulerResponses);
1005                            }
1006                            catch (Exception e) {
1007                                    _log.error(
1008                                            "Unable to load memory clustered jobs from cluster node " +
1009                                                    _address.getDescription(),
1010                                            e);
1011                            }
1012                    }
1013    
1014                    @Override
1015                    public void processTimeoutException(TimeoutException timeoutException) {
1016                            _log.error(
1017                                    "Unable to load memory clustered jobs from cluster node " +
1018                                            _address.getDescription(),
1019                                    timeoutException);
1020                    }
1021    
1022                    private Address _address;
1023    
1024            }
1025    
1026    }