001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.io.Deserializer;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.io.InputStream;
022    import java.io.OutputStream;
023    
024    import java.nio.ByteBuffer;
025    
026    import java.util.List;
027    import java.util.concurrent.CountDownLatch;
028    
029    import org.jgroups.Address;
030    import org.jgroups.Message;
031    import org.jgroups.Receiver;
032    import org.jgroups.View;
033    
034    /**
035     * @author Tina Tian
036     */
037    public abstract class BaseReceiver implements Receiver {
038    
039            @Override
040            public void block() {
041            }
042    
043            @Override
044            public void getState(OutputStream outputStream) throws Exception {
045            }
046    
047            public View getView() {
048                    return _view;
049            }
050    
051            public Address getCoordinator() {
052                    return _coordinator;
053            }
054    
055            public void openLatch() {
056                    _countDownLatch.countDown();
057            }
058    
059            @Override
060            public void receive(Message message) {
061                    try {
062                            _countDownLatch.await();
063                    }
064                    catch (InterruptedException ie) {
065                            _log.error(
066                                    "Latch opened prematurely by interruption. Dependence may " +
067                                            "not be ready.");
068                    }
069    
070                    doReceive(message);
071            }
072    
073            @Override
074            public void setState(InputStream inputStream) throws Exception {
075            }
076    
077            @Override
078            public void suspect(Address address) {
079            }
080    
081            @Override
082            public void unblock() {
083            }
084    
085            @Override
086            public void viewAccepted(View view) {
087                    if (_log.isInfoEnabled()) {
088                            _log.info("Accepted view " + view);
089                    }
090    
091                    if (_view == null) {
092                            _view = view;
093    
094                            List<Address> members = view.getMembers();
095    
096                            _coordinator = members.get(0);
097    
098                            return;
099                    }
100    
101                    try {
102                            _countDownLatch.await();
103                    }
104                    catch (InterruptedException ie) {
105                            _log.error(
106                                    "Latch opened prematurely by interruption. Dependence may " +
107                                            "not be ready.");
108                    }
109    
110                    View oldView = _view;
111    
112                    _view = view;
113    
114                    doViewAccepted(oldView, view);
115    
116                    List<Address> members = view.getMembers();
117    
118                    Address newCoordinator = members.get(0);
119    
120                    if (_coordinator.equals(newCoordinator)) {
121                            return;
122                    }
123    
124                    _coordinator = newCoordinator;
125    
126                    doCoordinatorUpdated(newCoordinator);
127            }
128    
129            protected abstract void doReceive(Message message);
130    
131            protected void doViewAccepted(View oldView, View newView) {
132            }
133    
134            protected void doCoordinatorUpdated(Address coordinator) {
135            }
136    
137            protected Object retrievePayload(Message message) {
138                    byte[] rawBuffer = message.getRawBuffer();
139    
140                    if (rawBuffer == null) {
141                            if (_log.isWarnEnabled()) {
142                                    _log.warn("Message content is null");
143                            }
144    
145                            return null;
146                    }
147    
148                    ByteBuffer byteBuffer = ByteBuffer.wrap(
149                            rawBuffer, message.getOffset(), message.getLength());
150    
151                    Deserializer deserializer = new Deserializer(byteBuffer.slice());
152    
153                    try {
154                            return deserializer.readObject();
155                    }
156                    catch (ClassNotFoundException cnfe) {
157                            if (_log.isWarnEnabled()) {
158                                    _log.warn("Unable to deserialize message payload", cnfe);
159                            }
160                    }
161    
162                    return null;
163            }
164    
165            private static Log _log = LogFactoryUtil.getLog(BaseReceiver.class);
166    
167            private final CountDownLatch _countDownLatch = new CountDownLatch(1);
168            private volatile View _view;
169            private volatile Address _coordinator;
170    
171    }