001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
028 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
029 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
030 import com.liferay.portal.kernel.exception.SystemException;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.util.MethodHandler;
034 import com.liferay.portal.kernel.util.Validator;
035 import com.liferay.portal.model.Lock;
036 import com.liferay.portal.service.LockLocalServiceUtil;
037
038 import java.util.HashSet;
039 import java.util.Set;
040
041
044 public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
045
046 public void destroy() {
047 if (!_enabled) {
048 return;
049 }
050
051 try {
052 _clusterExecutor.removeClusterEventListener(_clusterEventListener);
053
054 LockLocalServiceUtil.unlock(
055 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
056 }
057 catch (SystemException se) {
058 if (_log.isWarnEnabled()) {
059 _log.warn("Unable to destroy the cluster master executor", se);
060 }
061 }
062 }
063
064 @Override
065 public <T> NoticeableFuture<T> executeOnMaster(
066 MethodHandler methodHandler) {
067
068 if (!_enabled) {
069 if (_log.isWarnEnabled()) {
070 _log.warn(
071 "Executing on the local node because the cluster master " +
072 "executor is disabled");
073 }
074
075 DefaultNoticeableFuture<T> defaultNoticeableFuture =
076 new DefaultNoticeableFuture<T>();
077
078 try {
079 defaultNoticeableFuture.set((T)methodHandler.invoke());
080
081 return defaultNoticeableFuture;
082 }
083 catch (Exception e) {
084 throw new SystemException(e);
085 }
086 }
087
088 String masterAddressString = getMasterAddressString();
089
090 final Address address = AddressSerializerUtil.deserialize(
091 masterAddressString);
092
093 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
094 methodHandler, address);
095
096 try {
097 return new NoticeableFutureConverter<T, ClusterNodeResponses>(
098 _clusterExecutor.execute(clusterRequest)) {
099
100 @Override
101 protected T convert(
102 ClusterNodeResponses clusterNodeResponses)
103 throws Exception {
104
105 ClusterNodeResponse clusterNodeResponse =
106 clusterNodeResponses.getClusterResponse(address);
107
108 return (T)clusterNodeResponse.getResult();
109 }
110
111 };
112 }
113 catch (Exception e) {
114 throw new SystemException(
115 "Unable to execute on master " + address.getDescription(), e);
116 }
117 }
118
119 @Override
120 public void initialize() {
121 if (!_clusterExecutor.isEnabled()) {
122 return;
123 }
124
125 _localClusterNodeAddress = AddressSerializerUtil.serialize(
126 _clusterExecutor.getLocalClusterNodeAddress());
127
128 _clusterEventListener = new ClusterMasterTokenClusterEventListener();
129
130 _clusterExecutor.addClusterEventListener(_clusterEventListener);
131
132 String masterAddressString = getMasterAddressString();
133
134 _enabled = true;
135
136 notifyMasterTokenTransitionListeners(
137 _localClusterNodeAddress.equals(masterAddressString));
138 }
139
140 @Override
141 public boolean isEnabled() {
142 return _enabled;
143 }
144
145 @Override
146 public boolean isMaster() {
147 if (isEnabled()) {
148 return _master;
149 }
150
151 return true;
152 }
153
154 @Override
155 public void registerClusterMasterTokenTransitionListener(
156 ClusterMasterTokenTransitionListener
157 clusterMasterTokenTransitionListener) {
158
159 _clusterMasterTokenTransitionListeners.add(
160 clusterMasterTokenTransitionListener);
161 }
162
163 public void setClusterExecutor(ClusterExecutor clusterExecutor) {
164 _clusterExecutor = clusterExecutor;
165 }
166
167 public void setClusterMasterTokenTransitionListeners(
168 Set<ClusterMasterTokenTransitionListener>
169 clusterMasterTokenTransitionListeners) {
170
171 _clusterMasterTokenTransitionListeners.addAll(
172 clusterMasterTokenTransitionListeners);
173 }
174
175 @Override
176 public void unregisterClusterMasterTokenTransitionListener(
177 ClusterMasterTokenTransitionListener
178 clusterMasterTokenTransitionListener) {
179
180 _clusterMasterTokenTransitionListeners.remove(
181 clusterMasterTokenTransitionListener);
182 }
183
184 protected String getMasterAddressString() {
185 String owner = null;
186
187 while (true) {
188 try {
189 Lock lock = null;
190
191 if (owner == null) {
192 lock = LockLocalServiceUtil.lock(
193 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
194 _localClusterNodeAddress);
195 }
196 else {
197 lock = LockLocalServiceUtil.lock(
198 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
199 _localClusterNodeAddress);
200 }
201
202 owner = lock.getOwner();
203
204 Address address = AddressSerializerUtil.deserialize(owner);
205
206 if (_clusterExecutor.isClusterNodeAlive(address)) {
207 break;
208 }
209 }
210 catch (Exception e) {
211 if (_log.isWarnEnabled()) {
212 _log.warn("Unable to acquire the cluster master lock", e);
213 }
214 }
215
216 if (_log.isInfoEnabled()) {
217 if (Validator.isNotNull(owner)) {
218 _log.info("Lock currently held by " + owner);
219 }
220
221 _log.info("Reattempting to acquire the cluster master lock");
222 }
223 }
224
225 boolean master = _localClusterNodeAddress.equals(owner);
226
227 if (master == _master) {
228 return owner;
229 }
230
231 _master = master;
232
233 if (_enabled) {
234 notifyMasterTokenTransitionListeners(master);
235 }
236
237 return owner;
238 }
239
240 protected void notifyMasterTokenTransitionListeners(
241 boolean masterTokenAcquired) {
242
243 for (ClusterMasterTokenTransitionListener
244 clusterMasterTokenTransitionListener :
245 _clusterMasterTokenTransitionListeners) {
246
247 if (masterTokenAcquired) {
248 clusterMasterTokenTransitionListener.masterTokenAcquired();
249 }
250 else {
251 clusterMasterTokenTransitionListener.masterTokenReleased();
252 }
253 }
254 }
255
256 private static final String _LOCK_CLASS_NAME =
257 ClusterMasterExecutorImpl.class.getName();
258
259 private static final Log _log = LogFactoryUtil.getLog(
260 ClusterMasterExecutorImpl.class);
261
262 private static volatile boolean _master;
263
264 private ClusterEventListener _clusterEventListener;
265 private ClusterExecutor _clusterExecutor;
266 private final Set<ClusterMasterTokenTransitionListener>
267 _clusterMasterTokenTransitionListeners =
268 new HashSet<ClusterMasterTokenTransitionListener>();
269 private volatile boolean _enabled;
270 private volatile String _localClusterNodeAddress;
271
272 private class ClusterMasterTokenClusterEventListener
273 implements ClusterEventListener {
274
275 @Override
276 public void processClusterEvent(ClusterEvent clusterEvent) {
277 getMasterAddressString();
278 }
279
280 }
281
282 }