001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
027 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
028 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.log.Log;
031 import com.liferay.portal.kernel.log.LogFactoryUtil;
032 import com.liferay.portal.kernel.util.MethodHandler;
033 import com.liferay.portal.kernel.util.Validator;
034 import com.liferay.portal.model.Lock;
035 import com.liferay.portal.service.LockLocalServiceUtil;
036
037 import java.util.HashSet;
038 import java.util.Set;
039
040
043 public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
044
045 public void destroy() {
046 if (!_enabled) {
047 return;
048 }
049
050 try {
051 _clusterExecutor.removeClusterEventListener(_clusterEventListener);
052
053 LockLocalServiceUtil.unlock(
054 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
055 }
056 catch (SystemException se) {
057 if (_log.isWarnEnabled()) {
058 _log.warn("Unable to destroy the cluster master executor", se);
059 }
060 }
061 }
062
063 @Override
064 public <T> NoticeableFuture<T> executeOnMaster(
065 MethodHandler methodHandler) {
066
067 if (!_enabled) {
068 if (_log.isWarnEnabled()) {
069 _log.warn(
070 "Executing on the local node because the cluster master " +
071 "executor is disabled");
072 }
073
074 DefaultNoticeableFuture<T> defaultNoticeableFuture =
075 new DefaultNoticeableFuture<T>();
076
077 try {
078 defaultNoticeableFuture.set((T)methodHandler.invoke());
079
080 return defaultNoticeableFuture;
081 }
082 catch (Exception e) {
083 throw new SystemException(e);
084 }
085 }
086
087 String masterAddressString = getMasterAddressString();
088
089 final Address address = AddressSerializerUtil.deserialize(
090 masterAddressString);
091
092 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
093 methodHandler, address);
094
095 try {
096 return new NoticeableFutureConverter<T, ClusterNodeResponses>(
097 _clusterExecutor.execute(clusterRequest)) {
098
099 @Override
100 protected T convert(
101 ClusterNodeResponses clusterNodeResponses) {
102
103 return (T)clusterNodeResponses.getClusterResponse(
104 address);
105 }
106
107 };
108 }
109 catch (Exception e) {
110 throw new SystemException(
111 "Unable to execute on master " + address.getDescription(), e);
112 }
113 }
114
115 @Override
116 public void initialize() {
117 if (!_clusterExecutor.isEnabled()) {
118 return;
119 }
120
121 try {
122 _localClusterNodeAddress = AddressSerializerUtil.serialize(
123 _clusterExecutor.getLocalClusterNodeAddress());
124
125 _clusterEventListener =
126 new ClusterMasterTokenClusterEventListener();
127
128 _clusterExecutor.addClusterEventListener(_clusterEventListener);
129
130 String masterAddressString = getMasterAddressString();
131
132 _enabled = true;
133
134 notifyMasterTokenTransitionListeners(
135 _localClusterNodeAddress.equals(masterAddressString));
136 }
137 catch (Exception e) {
138 throw new RuntimeException(
139 "Unable to initialize cluster master executor", e);
140 }
141 }
142
143 @Override
144 public boolean isEnabled() {
145 return _enabled;
146 }
147
148 @Override
149 public boolean isMaster() {
150 return _master;
151 }
152
153 @Override
154 public void registerClusterMasterTokenTransitionListener(
155 ClusterMasterTokenTransitionListener
156 clusterMasterTokenTransitionListener) {
157
158 _clusterMasterTokenTransitionListeners.add(
159 clusterMasterTokenTransitionListener);
160 }
161
162 public void setClusterExecutor(ClusterExecutor clusterExecutor) {
163 _clusterExecutor = clusterExecutor;
164 }
165
166 public void setClusterMasterTokenTransitionListeners(
167 Set<ClusterMasterTokenTransitionListener>
168 clusterMasterTokenTransitionListeners) {
169
170 _clusterMasterTokenTransitionListeners.addAll(
171 clusterMasterTokenTransitionListeners);
172 }
173
174 @Override
175 public void unregisterClusterMasterTokenTransitionListener(
176 ClusterMasterTokenTransitionListener
177 clusterMasterTokenTransitionListener) {
178
179 _clusterMasterTokenTransitionListeners.remove(
180 clusterMasterTokenTransitionListener);
181 }
182
183 protected String getMasterAddressString() {
184 String owner = null;
185
186 while (true) {
187 try {
188 Lock lock = null;
189
190 if (owner == null) {
191 lock = LockLocalServiceUtil.lock(
192 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
193 _localClusterNodeAddress);
194 }
195 else {
196 lock = LockLocalServiceUtil.lock(
197 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
198 _localClusterNodeAddress);
199 }
200
201 owner = lock.getOwner();
202
203 Address address = AddressSerializerUtil.deserialize(owner);
204
205 if (_clusterExecutor.isClusterNodeAlive(address)) {
206 break;
207 }
208 }
209 catch (Exception e) {
210 if (_log.isWarnEnabled()) {
211 _log.warn(
212 "Unable to acquire memory scheduler cluster lock", e);
213 }
214 }
215
216 if (_log.isInfoEnabled()) {
217 if (Validator.isNotNull(owner)) {
218 _log.info("Lock currently held by " + owner);
219 }
220
221 _log.info(
222 "Reattempting to acquire memory scheduler cluster lock");
223 }
224 }
225
226 boolean master = _localClusterNodeAddress.equals(owner);
227
228 if (master == _master) {
229 return owner;
230 }
231
232 _master = master;
233
234 if (_enabled) {
235 notifyMasterTokenTransitionListeners(master);
236 }
237
238 return owner;
239 }
240
241 protected void notifyMasterTokenTransitionListeners(
242 boolean masterTokenAcquired) {
243
244 for (ClusterMasterTokenTransitionListener
245 clusterMasterTokenTransitionListener :
246 _clusterMasterTokenTransitionListeners) {
247
248 if (masterTokenAcquired) {
249 clusterMasterTokenTransitionListener.masterTokenAcquired();
250 }
251 else {
252 clusterMasterTokenTransitionListener.masterTokenReleased();
253 }
254 }
255 }
256
257 private static final String _LOCK_CLASS_NAME =
258 ClusterMasterExecutorImpl.class.getName();
259
260 private static final Log _log = LogFactoryUtil.getLog(
261 ClusterMasterExecutorImpl.class);
262
263 private static volatile boolean _master;
264
265 private ClusterEventListener _clusterEventListener;
266 private ClusterExecutor _clusterExecutor;
267 private final Set<ClusterMasterTokenTransitionListener>
268 _clusterMasterTokenTransitionListeners =
269 new HashSet<ClusterMasterTokenTransitionListener>();
270 private volatile boolean _enabled;
271 private volatile String _localClusterNodeAddress;
272
273 private class ClusterMasterTokenClusterEventListener
274 implements ClusterEventListener {
275
276 @Override
277 public void processClusterEvent(ClusterEvent clusterEvent) {
278 try {
279 getMasterAddressString();
280 }
281 catch (Exception e) {
282 _log.error("Unable to update the cluster master lock", e);
283 }
284 }
285 }
286
287 }