001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018    import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterMessageType;
022    import com.liferay.portal.kernel.cluster.ClusterNode;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.util.MethodHandler;
029    import com.liferay.portal.kernel.util.Validator;
030    import com.liferay.portal.security.pacl.PACLClassLoaderUtil;
031    
032    import java.io.Serializable;
033    
034    import java.util.ArrayList;
035    import java.util.Collections;
036    import java.util.List;
037    import java.util.concurrent.CountDownLatch;
038    
039    import org.jgroups.Channel;
040    import org.jgroups.ChannelException;
041    import org.jgroups.Message;
042    import org.jgroups.View;
043    
044    /**
045     * @author Michael C. Han
046     * @author Tina Tian
047     */
048    public class ClusterRequestReceiver extends BaseReceiver {
049    
050            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
051                    _countDownLatch = new CountDownLatch(1);
052                    _clusterExecutorImpl = clusterExecutorImpl;
053            }
054    
055            public void openLatch() {
056                    _countDownLatch.countDown();
057            }
058    
059            @Override
060            public void receive(Message message) {
061                    try {
062                            _countDownLatch.await();
063                    }
064                    catch (InterruptedException ie) {
065                            _log.error(
066                                    "Latch opened prematurely by interruption. Dependence may " +
067                                            "not be ready.");
068                    }
069    
070                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
071    
072                    org.jgroups.Address localAddress = controlChannel.getAddress();
073    
074                    Object obj = message.getObject();
075    
076                    if (obj == null) {
077                            if (_log.isWarnEnabled()) {
078                                    _log.warn("Message content is null");
079                            }
080    
081                            return;
082                    }
083    
084                    org.jgroups.Address sourceAddress = message.getSrc();
085    
086                    if (localAddress.equals(sourceAddress)) {
087                            boolean isProcessed = processLocalMessage(obj, sourceAddress);
088    
089                            if (isProcessed) {
090                                    return;
091                            }
092                    }
093    
094                    if (obj instanceof ClusterRequest) {
095                            ClusterRequest clusterRequest = (ClusterRequest)obj;
096    
097                            processClusterRequest(clusterRequest, sourceAddress, localAddress);
098                    }
099                    else if (obj instanceof ClusterNodeResponse) {
100                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
101    
102                            processClusterResponse(
103                                    clusterNodeResponse, sourceAddress, localAddress);
104                    }
105                    else if (_log.isWarnEnabled()) {
106                            _log.warn(
107                                    "Unable to process message content of type " + obj.getClass());
108                    }
109            }
110    
111            @Override
112            public void viewAccepted(View view) {
113                    super.viewAccepted(view);
114    
115                    if (_lastView == null) {
116                            _lastView = view;
117    
118                            return;
119                    }
120    
121                    List<Address> departAddresses = getDepartAddresses(view);
122                    List<Address> newAddresses = getNewAddresses(view);
123    
124                    _lastView = view;
125    
126                    try {
127                            _countDownLatch.await();
128                    }
129                    catch (InterruptedException ie) {
130                            _log.error(
131                                    "Latch opened prematurely by interruption. Dependence may " +
132                                            "not be ready.");
133                    }
134    
135                    if (!newAddresses.isEmpty()) {
136                            _clusterExecutorImpl.sendNotifyRequest();
137                    }
138    
139                    if (!departAddresses.isEmpty()) {
140                            _clusterExecutorImpl.memberRemoved(departAddresses);
141                    }
142            }
143    
144            protected List<Address> getDepartAddresses(View view) {
145                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
146                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
147    
148                    List<org.jgroups.Address> departJGroupsAddresses =
149                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
150    
151                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
152    
153                    if (departJGroupsAddresses.isEmpty()) {
154                            return Collections.emptyList();
155                    }
156    
157                    List<Address> departAddresses = new ArrayList<Address>(
158                            departJGroupsAddresses.size());
159    
160                    for (org.jgroups.Address departJGroupsAddress :
161                                    departJGroupsAddresses) {
162    
163                            Address departAddress = new AddressImpl(departJGroupsAddress);
164    
165                            departAddresses.add(departAddress);
166                    }
167    
168                    return departAddresses;
169            }
170    
171            protected List<Address> getNewAddresses(View view) {
172                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
173                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
174    
175                    List<org.jgroups.Address> newJGroupsAddresses =
176                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
177    
178                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
179    
180                    if (newJGroupsAddresses.isEmpty()) {
181                            return Collections.emptyList();
182                    }
183    
184                    List<Address> newAddresses = new ArrayList<Address>(
185                            newJGroupsAddresses.size());
186    
187                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
188                            Address newAddress = new AddressImpl(newJGroupsAddress);
189    
190                            newAddresses.add(newAddress);
191                    }
192    
193                    return newAddresses;
194            }
195    
196            protected Object invoke(
197                            String servletContextName, String beanIdentifier,
198                            MethodHandler methodHandler)
199                    throws Exception {
200    
201                    if (servletContextName == null) {
202                            if (Validator.isNull(beanIdentifier)) {
203                                    return methodHandler.invoke(true);
204                            }
205                            else {
206                                    Object bean = PortalBeanLocatorUtil.locate(beanIdentifier);
207    
208                                    return methodHandler.invoke(bean);
209                            }
210                    }
211    
212                    ClassLoader contextClassLoader =
213                            PACLClassLoaderUtil.getContextClassLoader();
214    
215                    try {
216                            ClassLoader classLoader =
217                                    (ClassLoader)PortletBeanLocatorUtil.locate(
218                                            servletContextName, "portletClassLoader");
219    
220                            PACLClassLoaderUtil.setContextClassLoader(classLoader);
221    
222                            if (Validator.isNull(beanIdentifier)) {
223                                    return methodHandler.invoke(true);
224                            }
225                            else {
226                                    Object bean = PortletBeanLocatorUtil.locate(
227                                            servletContextName, beanIdentifier);
228    
229                                    return methodHandler.invoke(bean);
230                            }
231                    }
232                    catch (Exception e) {
233                            throw e;
234                    }
235                    finally {
236                            PACLClassLoaderUtil.setContextClassLoader(contextClassLoader);
237                    }
238            }
239    
240            protected void processClusterRequest(
241                    ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
242                    org.jgroups.Address localAddress) {
243    
244                    ClusterMessageType clusterMessageType =
245                            clusterRequest.getClusterMessageType();
246    
247                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
248    
249                    Address address = new AddressImpl(localAddress);
250    
251                    clusterNodeResponse.setAddress(address);
252    
253                    ClusterNode localClusterNode =
254                            _clusterExecutorImpl.getLocalClusterNode();
255    
256                    clusterNodeResponse.setClusterNode(localClusterNode);
257    
258                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
259                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
260    
261                            ClusterNode originatingClusterNode =
262                                    clusterRequest.getOriginatingClusterNode();
263    
264                            if (originatingClusterNode != null) {
265                                    _clusterExecutorImpl.memberJoined(
266                                            new AddressImpl(sourceAddress), originatingClusterNode);
267    
268                                    clusterNodeResponse.setClusterMessageType(clusterMessageType);
269                            }
270                            else {
271                                    if (_log.isWarnEnabled()) {
272                                            _log.warn(
273                                                    "Content of notify message does not contain cluster " +
274                                                            "node information");
275                                    }
276    
277                                    return;
278                            }
279                    }
280                    else {
281                            clusterNodeResponse.setClusterMessageType(
282                                    ClusterMessageType.EXECUTE);
283                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
284                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
285    
286                            MethodHandler methodHandler = clusterRequest.getMethodHandler();
287    
288                            if (methodHandler != null) {
289                                    try {
290                                            ClusterInvokeThreadLocal.setEnabled(false);
291    
292                                            Object returnValue = invoke(
293                                                    clusterRequest.getServletContextName(),
294                                                    clusterRequest.getBeanIdentifier(), methodHandler);
295    
296                                            if (returnValue instanceof Serializable) {
297                                                    clusterNodeResponse.setResult(returnValue);
298                                            }
299                                            else if (returnValue != null) {
300                                                    clusterNodeResponse.setException(
301                                                            new ClusterException(
302                                                                    "Return value is not serializable"));
303                                            }
304                                    }
305                                    catch (Exception e) {
306                                            clusterNodeResponse.setException(e);
307    
308                                            _log.error("Failed to invoke method " + methodHandler, e);
309                                    }
310                                    finally {
311                                            ClusterInvokeThreadLocal.setEnabled(true);
312                                    }
313                            }
314                            else {
315                                    clusterNodeResponse.setException(
316                                            new ClusterException(
317                                                    "Payload is not of type " +
318                                                            MethodHandler.class.getName()));
319                            }
320                    }
321    
322                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
323    
324                    try {
325                            controlChannel.send(
326                                    sourceAddress, localAddress, clusterNodeResponse);
327                    }
328                    catch (ChannelException ce) {
329                            _log.error(
330                                    "Unable to send response message " + clusterNodeResponse, ce);
331                    }
332                    catch (Throwable t) {
333                            _log.error(t, t);
334                    }
335            }
336    
337            protected void processClusterResponse(
338                    ClusterNodeResponse clusterNodeResponse,
339                    org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
340    
341                    ClusterMessageType clusterMessageType =
342                            clusterNodeResponse.getClusterMessageType();
343    
344                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
345                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
346    
347                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
348    
349                            if (clusterNode != null) {
350                                    Address joinAddress = new AddressImpl(sourceAddress);
351    
352                                    _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
353                            }
354                            else {
355                                    if (_log.isWarnEnabled()) {
356                                            _log.warn(
357                                                    "Response of notify message does not contain cluster " +
358                                                            "node information");
359                                    }
360                            }
361    
362                            return;
363                    }
364    
365                    String uuid = clusterNodeResponse.getUuid();
366    
367                    FutureClusterResponses futureClusterResponses =
368                            _clusterExecutorImpl.getExecutionResults(uuid);
369    
370                    if (futureClusterResponses == null) {
371                            if (_log.isInfoEnabled()) {
372                                    _log.info("Unable to find response container for " + uuid);
373                            }
374    
375                            return;
376                    }
377    
378                    Address address = new AddressImpl(sourceAddress);
379    
380                    if (futureClusterResponses.expectsReply(address)) {
381                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
382                    }
383                    else {
384                            if (_log.isWarnEnabled()) {
385                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
386                            }
387                    }
388            }
389    
390            protected boolean processLocalMessage(
391                    Object message, org.jgroups.Address sourceAddress) {
392    
393                    if (message instanceof ClusterRequest) {
394                            ClusterRequest clusterRequest = (ClusterRequest)message;
395    
396                            if (clusterRequest.isSkipLocal()) {
397                                    return true;
398                            }
399                    }
400    
401                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
402                            return true;
403                    }
404    
405                    return false;
406            }
407    
408            private static Log _log = LogFactoryUtil.getLog(
409                    ClusterRequestReceiver.class);
410    
411            private ClusterExecutorImpl _clusterExecutorImpl;
412            private CountDownLatch _countDownLatch;
413            private volatile View _lastView;
414    
415    }