001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterInvokeThreadLocal;
018    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
019    import com.liferay.portal.kernel.bean.BeanReference;
020    import com.liferay.portal.kernel.bean.IdentifiableBean;
021    import com.liferay.portal.kernel.cluster.Address;
022    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023    import com.liferay.portal.kernel.cluster.ClusterEvent;
024    import com.liferay.portal.kernel.cluster.ClusterEventListener;
025    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
028    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
029    import com.liferay.portal.kernel.cluster.ClusterRequest;
030    import com.liferay.portal.kernel.cluster.Clusterable;
031    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
032    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
033    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
034    import com.liferay.portal.kernel.log.Log;
035    import com.liferay.portal.kernel.log.LogFactoryUtil;
036    import com.liferay.portal.kernel.messaging.Message;
037    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
038    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
039    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
040    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
041    import com.liferay.portal.kernel.scheduler.SchedulerException;
042    import com.liferay.portal.kernel.scheduler.StorageType;
043    import com.liferay.portal.kernel.scheduler.Trigger;
044    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
045    import com.liferay.portal.kernel.scheduler.TriggerState;
046    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
047    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
048    import com.liferay.portal.kernel.util.Base64;
049    import com.liferay.portal.kernel.util.CharPool;
050    import com.liferay.portal.kernel.util.MethodHandler;
051    import com.liferay.portal.kernel.util.MethodKey;
052    import com.liferay.portal.kernel.util.ObjectValuePair;
053    import com.liferay.portal.kernel.util.StringPool;
054    import com.liferay.portal.model.Lock;
055    import com.liferay.portal.service.LockLocalServiceUtil;
056    import com.liferay.portal.util.PropsValues;
057    
058    import java.io.ObjectInputStream;
059    import java.io.ObjectOutputStream;
060    import java.io.Serializable;
061    
062    import java.util.Iterator;
063    import java.util.List;
064    import java.util.Map;
065    import java.util.Set;
066    import java.util.concurrent.ConcurrentHashMap;
067    import java.util.concurrent.TimeUnit;
068    import java.util.concurrent.TimeoutException;
069    import java.util.concurrent.locks.ReadWriteLock;
070    import java.util.concurrent.locks.ReentrantReadWriteLock;
071    
072    /**
073     * @author Tina Tian
074     */
075    public class ClusterSchedulerEngine
076            implements IdentifiableBean, SchedulerEngine,
077                               SchedulerEngineClusterManager {
078    
079            public static SchedulerEngine createClusterSchedulerEngine(
080                    SchedulerEngine schedulerEngine) {
081    
082                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
083                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
084                    }
085    
086                    return schedulerEngine;
087            }
088    
089            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
090                    _schedulerEngine = schedulerEngine;
091            }
092    
093            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
094            @Override
095            public void delete(String groupName) throws SchedulerException {
096                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
097    
098                    _readLock.lock();
099    
100                    try {
101                            if (memoryClusteredSlaveJob) {
102                                    removeMemoryClusteredJobs(groupName);
103                            }
104                            else {
105                                    _schedulerEngine.delete(groupName);
106                            }
107                    }
108                    finally {
109                            _readLock.unlock();
110                    }
111    
112                    setClusterableThreadLocal(groupName);
113            }
114    
115            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
116            @Override
117            public void delete(String jobName, String groupName)
118                    throws SchedulerException {
119    
120                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
121    
122                    _readLock.lock();
123    
124                    try {
125                            if (memoryClusteredSlaveJob) {
126                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
127                            }
128                            else {
129                                    _schedulerEngine.delete(jobName, groupName);
130                            }
131                    }
132                    finally {
133                            _readLock.unlock();
134                    }
135    
136                    setClusterableThreadLocal(groupName);
137            }
138    
139            @Override
140            public String getBeanIdentifier() {
141                    return _beanIdentifier;
142            }
143    
144            @Override
145            public SchedulerResponse getScheduledJob(String jobName, String groupName)
146                    throws SchedulerException {
147    
148                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
149                            groupName);
150    
151                    StorageType storageType = objectValuePair.getValue();
152    
153                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
154                            String masterAddressString = getMasterAddressString(false);
155    
156                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
157                                    return (SchedulerResponse)callMaster(
158                                            masterAddressString, _getScheduledJobMethodKey, jobName,
159                                            objectValuePair.getKey(), storageType);
160                            }
161                    }
162    
163                    _readLock.lock();
164    
165                    try {
166                            return _schedulerEngine.getScheduledJob(jobName, groupName);
167                    }
168                    finally {
169                            _readLock.unlock();
170                    }
171            }
172    
173            @Override
174            public List<SchedulerResponse> getScheduledJobs()
175                    throws SchedulerException {
176    
177                    String masterAddressString = getMasterAddressString(false);
178    
179                    if (!_localClusterNodeAddress.equals(masterAddressString)) {
180                            return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
181                    }
182    
183                    _readLock.lock();
184    
185                    try {
186                            return _schedulerEngine.getScheduledJobs();
187                    }
188                    finally {
189                            _readLock.unlock();
190                    }
191            }
192    
193            @Override
194            public List<SchedulerResponse> getScheduledJobs(String groupName)
195                    throws SchedulerException {
196    
197                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
198                            groupName);
199    
200                    StorageType storageType = objectValuePair.getValue();
201    
202                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
203                            String masterAddressString = getMasterAddressString(false);
204    
205                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
206                                    return callMaster(
207                                            masterAddressString, _getScheduledJobsMethodKey2,
208                                            objectValuePair.getKey(), storageType);
209                            }
210                    }
211    
212                    _readLock.lock();
213    
214                    try {
215                            return _schedulerEngine.getScheduledJobs(groupName);
216                    }
217                    finally {
218                            _readLock.unlock();
219                    }
220            }
221    
222            @Override
223            public void initialize() throws SchedulerException {
224                    try {
225                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
226    
227                            _readLock = readWriteLock.readLock();
228                            _writeLock = readWriteLock.writeLock();
229    
230                            _localClusterNodeAddress = getSerializedString(
231                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
232    
233                            _clusterEventListener = new MemorySchedulerClusterEventListener();
234    
235                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
236    
237                            String masterAddressString = getMasterAddressString(false);
238    
239                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
240                                    List<SchedulerResponse> schedulerResponses = callMaster(
241                                            masterAddressString, _getScheduledJobsMethodKey3,
242                                            StorageType.MEMORY_CLUSTERED);
243    
244                                    initMemoryClusteredJobs(schedulerResponses);
245                            }
246                    }
247                    catch (Exception e) {
248                            throw new SchedulerException("Unable to initialize scheduler", e);
249                    }
250            }
251    
252            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
253            @Override
254            public void pause(String groupName) throws SchedulerException {
255                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
256    
257                    _readLock.lock();
258    
259                    try {
260                            if (memoryClusteredSlaveJob) {
261                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
262                            }
263                            else {
264                                    _schedulerEngine.pause(groupName);
265                            }
266                    }
267                    finally {
268                            _readLock.unlock();
269                    }
270    
271                    setClusterableThreadLocal(groupName);
272            }
273    
274            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
275            @Override
276            public void pause(String jobName, String groupName)
277                    throws SchedulerException {
278    
279                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
280    
281                    _readLock.lock();
282    
283                    try {
284                            if (memoryClusteredSlaveJob) {
285                                    updateMemoryClusteredJob(
286                                            jobName, groupName, TriggerState.PAUSED);
287                            }
288                            else {
289                                    _schedulerEngine.pause(jobName, groupName);
290                            }
291                    }
292                    finally {
293                            _readLock.unlock();
294                    }
295    
296                    setClusterableThreadLocal(groupName);
297            }
298    
299            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
300            @Override
301            public void resume(String groupName) throws SchedulerException {
302                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
303    
304                    _readLock.lock();
305    
306                    try {
307                            if (memoryClusteredSlaveJob) {
308                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
309                            }
310                            else {
311                                    _schedulerEngine.resume(groupName);
312                            }
313                    }
314                    finally {
315                            _readLock.unlock();
316                    }
317    
318                    setClusterableThreadLocal(groupName);
319            }
320    
321            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
322            @Override
323            public void resume(String jobName, String groupName)
324                    throws SchedulerException {
325    
326                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
327    
328                    _readLock.lock();
329    
330                    try {
331                            if (memoryClusteredSlaveJob) {
332                                    updateMemoryClusteredJob(
333                                            jobName, groupName, TriggerState.NORMAL);
334                            }
335                            else {
336                                    _schedulerEngine.resume(jobName, groupName);
337                            }
338                    }
339                    finally {
340                            _readLock.unlock();
341                    }
342    
343                    setClusterableThreadLocal(groupName);
344            }
345    
346            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
347            @Override
348            public void schedule(
349                            Trigger trigger, String description, String destinationName,
350                            Message message)
351                    throws SchedulerException {
352    
353                    String groupName = trigger.getGroupName();
354                    String jobName = trigger.getJobName();
355    
356                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
357    
358                    _readLock.lock();
359    
360                    try {
361                            if (memoryClusteredSlaveJob) {
362                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
363    
364                                    schedulerResponse.setDescription(description);
365                                    schedulerResponse.setDestinationName(destinationName);
366                                    schedulerResponse.setGroupName(groupName);
367                                    schedulerResponse.setJobName(jobName);
368                                    schedulerResponse.setMessage(message);
369                                    schedulerResponse.setTrigger(trigger);
370    
371                                    _memoryClusteredJobs.put(
372                                            getFullName(jobName, groupName),
373                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
374                                                    schedulerResponse, TriggerState.NORMAL));
375                            }
376                            else {
377                                    _schedulerEngine.schedule(
378                                            trigger, description, destinationName, message);
379                            }
380                    }
381                    finally {
382                            _readLock.unlock();
383                    }
384    
385                    setClusterableThreadLocal(groupName);
386            }
387    
388            @Override
389            public void setBeanIdentifier(String beanIdentifier) {
390                    _beanIdentifier = beanIdentifier;
391            }
392    
393            @Override
394            public void shutdown() throws SchedulerException {
395                    _portalReady = false;
396    
397                    try {
398                            ClusterExecutorUtil.removeClusterEventListener(
399                                    _clusterEventListener);
400    
401                            LockLocalServiceUtil.unlock(
402                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
403                    }
404                    catch (Exception e) {
405                            throw new SchedulerException("Unable to shutdown scheduler", e);
406                    }
407    
408                    _schedulerEngine.shutdown();
409            }
410    
411            @Override
412            public void start() throws SchedulerException {
413                    _schedulerEngine.start();
414    
415                    _portalReady = true;
416            }
417    
418            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
419            @Override
420            public void suppressError(String jobName, String groupName)
421                    throws SchedulerException {
422    
423                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
424    
425                    if (!memoryClusteredSlaveJob) {
426                            _readLock.lock();
427    
428                            try {
429                                    _schedulerEngine.suppressError(jobName, groupName);
430                            }
431                            finally {
432                                    _readLock.unlock();
433                            }
434                    }
435    
436                    setClusterableThreadLocal(groupName);
437            }
438    
439            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
440            @Override
441            public void unschedule(String groupName) throws SchedulerException {
442                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
443    
444                    _readLock.lock();
445    
446                    try {
447                            if (memoryClusteredSlaveJob) {
448                                    removeMemoryClusteredJobs(groupName);
449                            }
450                            else {
451                                    _schedulerEngine.unschedule(groupName);
452                            }
453                    }
454                    finally {
455                            _readLock.unlock();
456                    }
457    
458                    setClusterableThreadLocal(groupName);
459            }
460    
461            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
462            @Override
463            public void unschedule(String jobName, String groupName)
464                    throws SchedulerException {
465    
466                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
467    
468                    _readLock.lock();
469    
470                    try {
471                            if (memoryClusteredSlaveJob) {
472                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
473                            }
474                            else {
475                                    _schedulerEngine.unschedule(jobName, groupName);
476                            }
477                    }
478                    finally {
479                            _readLock.unlock();
480                    }
481    
482                    setClusterableThreadLocal(groupName);
483            }
484    
485            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
486            @Override
487            public void update(Trigger trigger) throws SchedulerException {
488                    String jobName = trigger.getJobName();
489                    String groupName = trigger.getGroupName();
490    
491                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
492    
493                    _readLock.lock();
494    
495                    try {
496                            if (memoryClusteredSlaveJob) {
497                                    boolean updated = false;
498    
499                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
500                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
501    
502                                            SchedulerResponse schedulerResponse =
503                                                    memoryClusteredJob.getKey();
504    
505                                            if (jobName.equals(schedulerResponse.getJobName()) &&
506                                                    groupName.equals(schedulerResponse.getGroupName())) {
507    
508                                                    schedulerResponse.setTrigger(trigger);
509    
510                                                    updated = true;
511    
512                                                    break;
513                                            }
514                                    }
515    
516                                    if (!updated) {
517                                            throw new SchedulerException(
518                                                    "Unable to update trigger for memory clustered job");
519                                    }
520                            }
521                            else {
522                                    _schedulerEngine.update(trigger);
523                            }
524                    }
525                    finally {
526                            _readLock.unlock();
527                    }
528    
529                    setClusterableThreadLocal(groupName);
530            }
531    
532            @Override
533            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
534                    getMasterAddressString(false);
535    
536                    return null;
537            }
538    
539            protected <T> T callMaster(
540                            String masterAddressString, MethodKey methodKey,
541                            Object... arguments)
542                    throws SchedulerException {
543    
544                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
545    
546                    Address address = (Address)getDeserializedObject(masterAddressString);
547    
548                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
549                            methodHandler, address);
550    
551                    try {
552                            FutureClusterResponses futureClusterResponses =
553                                    ClusterExecutorUtil.execute(clusterRequest);
554    
555                            ClusterNodeResponses clusterNodeResponses =
556                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
557    
558                            ClusterNodeResponse clusterNodeResponse =
559                                    clusterNodeResponses.getClusterResponse(address);
560    
561                            return (T)clusterNodeResponse.getResult();
562                    }
563                    catch (Exception e) {
564                            throw new SchedulerException(
565                                    "Unable to load scheduled jobs from cluster node " +
566                                            address.getDescription(),
567                                    e);
568                    }
569            }
570    
571            protected Object getDeserializedObject(String string)
572                    throws SchedulerException {
573    
574                    byte[] bytes = Base64.decode(string);
575    
576                    UnsyncByteArrayInputStream byteArrayInputStream =
577                            new UnsyncByteArrayInputStream(bytes);
578    
579                    ObjectInputStream objectInputStream = null;
580    
581                    try {
582                            objectInputStream = new ObjectInputStream(byteArrayInputStream);
583    
584                            Object object = objectInputStream.readObject();
585    
586                            return object;
587                    }
588                    catch (Exception e) {
589                            throw new SchedulerException(
590                                    "Unable to deserialize object from " + string, e);
591                    }
592                    finally {
593                            try {
594                                    objectInputStream.close();
595                            }
596                            catch (Exception e) {
597                            }
598                    }
599            }
600    
601            protected String getFullName(String jobName, String groupName) {
602                    return groupName.concat(StringPool.PERIOD).concat(jobName);
603            }
604    
605            protected String getMasterAddressString(boolean asynchronous)
606                    throws SchedulerException {
607    
608                    String owner = null;
609    
610                    Lock lock = null;
611    
612                    while (true) {
613                            try {
614                                    if (owner == null) {
615                                            lock = LockLocalServiceUtil.lock(
616                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
617                                                    _localClusterNodeAddress);
618                                    }
619                                    else {
620                                            lock = LockLocalServiceUtil.lock(
621                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
622                                                    _localClusterNodeAddress);
623                                    }
624    
625                                    Address address = (Address)getDeserializedObject(
626                                            lock.getOwner());
627    
628                                    if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
629                                            break;
630                                    }
631                                    else {
632                                            owner = lock.getOwner();
633                                    }
634                            }
635                            catch (Exception e) {
636                                    if (_log.isWarnEnabled()) {
637                                            _log.warn(
638                                                    "Unable to obtain memory scheduler cluster lock. " +
639                                                            "Trying again.");
640                                    }
641                            }
642                    }
643    
644                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
645    
646                    if (master == _master) {
647                            return lock.getOwner();
648                    }
649    
650                    if (master) {
651                            slaveToMaster();
652                    }
653                    else {
654                            masterToSlave(lock.getOwner(), asynchronous);
655                    }
656    
657                    return lock.getOwner();
658            }
659    
660            protected String getSerializedString(Object object) throws Exception {
661                    UnsyncByteArrayOutputStream byteArrayOutputStream =
662                            new UnsyncByteArrayOutputStream();
663    
664                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
665                            byteArrayOutputStream);
666    
667                    objectOutputStream.writeObject(object);
668                    objectOutputStream.close();
669    
670                    byte[] bytes = byteArrayOutputStream.toByteArray();
671    
672                    return Base64.encode(bytes);
673            }
674    
675            protected void initMemoryClusteredJobs(
676                            List<SchedulerResponse> schedulerResponses)
677                    throws Exception {
678    
679                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
680                            Trigger oldTrigger = schedulerResponse.getTrigger();
681    
682                            String jobName = schedulerResponse.getJobName();
683                            String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
684                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
685    
686                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
687                                    oldTrigger.getTriggerType(), jobName, groupName,
688                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
689                                    oldTrigger.getTriggerContent());
690    
691                            schedulerResponse.setTrigger(newTrigger);
692    
693                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
694                                    schedulerResponse);
695    
696                            Message message = schedulerResponse.getMessage();
697    
698                            message.remove(JOB_STATE);
699    
700                            _memoryClusteredJobs.put(
701                                    getFullName(jobName, groupName),
702                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
703                                            schedulerResponse, triggerState));
704                    }
705            }
706    
707            protected boolean isMemoryClusteredSlaveJob(String groupName)
708                    throws SchedulerException {
709    
710                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
711                            groupName);
712    
713                    StorageType storageType = objectValuePair.getValue();
714    
715                    if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
716                            return false;
717                    }
718    
719                    String masterAddressString = getMasterAddressString(false);
720    
721                    if (_localClusterNodeAddress.equals(masterAddressString)) {
722                            return false;
723                    }
724    
725                    return true;
726            }
727    
728            protected void masterToSlave(
729                            String masterAddressString, boolean asynchronous)
730                    throws SchedulerException {
731    
732                    if (asynchronous) {
733                            MethodHandler methodHandler = new MethodHandler(
734                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
735    
736                            Address address = (Address)getDeserializedObject(
737                                    masterAddressString);
738    
739                            ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
740                                    methodHandler, address);
741    
742                            try {
743                                    ClusterExecutorUtil.execute(
744                                            clusterRequest,
745                                            new MemorySchedulerClusterResponseCallback(address), 20,
746                                            TimeUnit.SECONDS);
747    
748                                    return;
749                            }
750                            catch (Exception e) {
751                                    throw new SchedulerException(
752                                            "Unable to load scheduled jobs from cluster node " +
753                                                    address.getDescription(),
754                                            e);
755                            }
756                    }
757    
758                    List<SchedulerResponse> schedulerResponses = callMaster(
759                            masterAddressString, _getScheduledJobsMethodKey3,
760                            StorageType.MEMORY_CLUSTERED);
761    
762                    _doMasterToSlave(schedulerResponses);
763            }
764    
765            protected void removeMemoryClusteredJobs(String groupName) {
766                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
767                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
768    
769                    Iterator
770                            <Map.Entry<String,
771                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
772                                            memoryClusteredJobs.iterator();
773    
774                    while (itr.hasNext()) {
775                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
776                                    entry = itr.next();
777    
778                            ObjectValuePair<SchedulerResponse, TriggerState>
779                                    memoryClusteredJob = entry.getValue();
780    
781                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
782    
783                            if (groupName.equals(schedulerResponse.getGroupName())) {
784                                    itr.remove();
785                            }
786                    }
787            }
788    
789            protected ObjectValuePair<String, StorageType> resolveGroupName(
790                    String groupName) {
791    
792                    int index = groupName.indexOf(CharPool.POUND);
793    
794                    String storageTypeString = groupName.substring(0, index);
795    
796                    StorageType storageType = StorageType.valueOf(storageTypeString);
797    
798                    String orginalGroupName = groupName.substring(index + 1);
799    
800                    return new ObjectValuePair<String, StorageType>(
801                            orginalGroupName, storageType);
802            }
803    
804            protected void setClusterableThreadLocal(String groupName) {
805                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
806                            groupName);
807    
808                    ClusterableContextThreadLocal.putThreadLocalContext(
809                            STORAGE_TYPE, objectValuePair.getValue());
810                    ClusterableContextThreadLocal.putThreadLocalContext(
811                            _PORTAL_READY, _portalReady);
812    
813                    boolean pluginReady = true;
814    
815                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
816                            PluginContextLifecycleThreadLocal.isDestroying()) {
817    
818                            pluginReady = false;
819                    }
820    
821                    ClusterableContextThreadLocal.putThreadLocalContext(
822                            _PLUGIN_READY, pluginReady);
823            }
824    
825            protected void slaveToMaster() throws SchedulerException {
826                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
827    
828                    ProxyModeThreadLocal.setForceSync(true);
829    
830                    _writeLock.lock();
831    
832                    try {
833                            for (ObjectValuePair<SchedulerResponse, TriggerState>
834                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
835    
836                                    SchedulerResponse schedulerResponse =
837                                            memoryClusteredJob.getKey();
838    
839                                    _schedulerEngine.schedule(
840                                            schedulerResponse.getTrigger(),
841                                            schedulerResponse.getDescription(),
842                                            schedulerResponse.getDestinationName(),
843                                            schedulerResponse.getMessage());
844    
845                                    TriggerState triggerState = memoryClusteredJob.getValue();
846    
847                                    if (triggerState.equals(TriggerState.PAUSED)) {
848                                            _schedulerEngine.pause(
849                                                    schedulerResponse.getJobName(),
850                                                    schedulerResponse.getGroupName());
851                                    }
852                            }
853    
854                            _memoryClusteredJobs.clear();
855                    }
856                    finally {
857                            ProxyModeThreadLocal.setForceSync(forceSync);
858    
859                            _master = true;
860    
861                            _writeLock.unlock();
862                    }
863            }
864    
865            protected void updateMemoryClusteredJob(
866                    String jobName, String groupName, TriggerState triggerState) {
867    
868                    ObjectValuePair<SchedulerResponse, TriggerState>
869                            memoryClusteredJob = _memoryClusteredJobs.get(
870                                    getFullName(jobName, groupName));
871    
872                    if (memoryClusteredJob != null) {
873                            memoryClusteredJob.setValue(triggerState);
874                    }
875            }
876    
877            protected void updateMemoryClusteredJobs(
878                    String groupName, TriggerState triggerState) {
879    
880                    for (ObjectValuePair<SchedulerResponse, TriggerState>
881                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
882    
883                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
884    
885                            if (groupName.equals(schedulerResponse.getGroupName())) {
886                                    memoryClusteredJob.setValue(triggerState);
887                            }
888                    }
889            }
890    
891            @BeanReference(
892                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
893            protected SchedulerEngine schedulerEngine;
894    
895            private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
896                    throws SchedulerException {
897    
898                    _writeLock.lock();
899    
900                    try {
901                            for (SchedulerResponse schedulerResponse :
902                                            _schedulerEngine.getScheduledJobs()) {
903    
904                                    if (StorageType.MEMORY_CLUSTERED ==
905                                                    schedulerResponse.getStorageType()) {
906    
907                                            String groupName = StorageType.MEMORY_CLUSTERED.toString();
908    
909                                            groupName = groupName.concat(StringPool.POUND).concat(
910                                                    schedulerResponse.getGroupName());
911    
912                                            _schedulerEngine.delete(
913                                                    schedulerResponse.getJobName(), groupName);
914                                    }
915                            }
916    
917                            initMemoryClusteredJobs(schedulerResponses);
918    
919                            if (_log.isInfoEnabled()) {
920                                    _log.info("Switched current node from master to slave");
921                            }
922                    }
923                    catch (Exception e) {
924                            throw new SchedulerException(e);
925                    }
926                    finally {
927                            _master = false;
928    
929                            _writeLock.unlock();
930                    }
931            }
932    
933            private static final String _LOCK_CLASS_NAME =
934                    SchedulerEngine.class.getName();
935    
936            private static final String _PLUGIN_READY = "plugin.ready";
937    
938            private static final String _PORTAL_READY = "portal.ready";
939    
940            private static Log _log = LogFactoryUtil.getLog(
941                    ClusterSchedulerEngine.class);
942    
943            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
944                    SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
945                    String.class, StorageType.class);
946            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
947                    SchedulerEngineHelperUtil.class, "getScheduledJobs");
948            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
949                    SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
950                    StorageType.class);
951            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
952                    SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
953    
954            private String _beanIdentifier;
955            private ClusterEventListener _clusterEventListener;
956            private volatile String _localClusterNodeAddress;
957            private volatile boolean _master;
958            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
959                    _memoryClusteredJobs = new ConcurrentHashMap
960                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
961            private boolean _portalReady;
962            private java.util.concurrent.locks.Lock _readLock;
963            private SchedulerEngine _schedulerEngine;
964            private java.util.concurrent.locks.Lock _writeLock;
965    
966            private static class SchedulerClusterInvokeAcceptor
967                    implements ClusterInvokeAcceptor {
968    
969                    @Override
970                    public boolean accept(Map<String, Serializable> context) {
971                            if (ClusterInvokeThreadLocal.isEnabled()) {
972                                    return true;
973                            }
974    
975                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
976                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
977                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
978    
979                            if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
980                                    !pluginReady) {
981    
982                                    return false;
983                            }
984    
985                            return true;
986                    }
987    
988            }
989    
990            private class MemorySchedulerClusterEventListener
991                    implements ClusterEventListener {
992    
993                    @Override
994                    public void processClusterEvent(ClusterEvent clusterEvent) {
995                            try {
996                                    getMasterAddressString(true);
997                            }
998                            catch (Exception e) {
999                                    _log.error("Unable to update memory scheduler cluster lock", e);
1000                            }
1001                    }
1002    
1003            }
1004    
1005            private class MemorySchedulerClusterResponseCallback
1006                    extends BaseClusterResponseCallback {
1007    
1008                    public MemorySchedulerClusterResponseCallback(Address address) {
1009                            _address = address;
1010                    }
1011    
1012                    @Override
1013                    public void callback(ClusterNodeResponses clusterNodeResponses) {
1014                            try {
1015                                    ClusterNodeResponse clusterNodeResponse =
1016                                            clusterNodeResponses.getClusterResponse(_address);
1017    
1018                                    List<SchedulerResponse> schedulerResponses =
1019                                            (List<SchedulerResponse>)clusterNodeResponse.getResult();
1020    
1021                                    _doMasterToSlave(schedulerResponses);
1022                            }
1023                            catch (Exception e) {
1024                                    _log.error(
1025                                            "Unable to load memory clustered jobs from cluster node " +
1026                                                    _address.getDescription(),
1027                                            e);
1028                            }
1029                    }
1030    
1031                    @Override
1032                    public void processTimeoutException(TimeoutException timeoutException) {
1033                            _log.error(
1034                                    "Unable to load memory clustered jobs from cluster node " +
1035                                            _address.getDescription(),
1036                                    timeoutException);
1037                    }
1038    
1039                    private Address _address;
1040    
1041            }
1042    
1043    }