001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.io.Deserializer;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.InputStream;
022 import java.io.OutputStream;
023
024 import java.nio.ByteBuffer;
025
026 import java.util.List;
027 import java.util.concurrent.CountDownLatch;
028
029 import org.jgroups.Address;
030 import org.jgroups.Message;
031 import org.jgroups.Receiver;
032 import org.jgroups.View;
033
034
037 public abstract class BaseReceiver implements Receiver {
038
039 @Override
040 public void block() {
041 }
042
043 @Override
044 public void getState(OutputStream outputStream) throws Exception {
045 }
046
047 public View getView() {
048 return _view;
049 }
050
051 public Address getCoordinator() {
052 return _coordinator;
053 }
054
055 public void openLatch() {
056 _countDownLatch.countDown();
057 }
058
059 @Override
060 public void receive(Message message) {
061 try {
062 _countDownLatch.await();
063 }
064 catch (InterruptedException ie) {
065 _log.error(
066 "Latch opened prematurely by interruption. Dependence may " +
067 "not be ready.");
068 }
069
070 doReceive(message);
071 }
072
073 @Override
074 public void setState(InputStream inputStream) throws Exception {
075 }
076
077 @Override
078 public void suspect(Address address) {
079 }
080
081 @Override
082 public void unblock() {
083 }
084
085 @Override
086 public void viewAccepted(View view) {
087 if (_log.isInfoEnabled()) {
088 _log.info("Accepted view " + view);
089 }
090
091 if (_view == null) {
092 _view = view;
093
094 List<Address> members = view.getMembers();
095
096 _coordinator = members.get(0);
097
098 return;
099 }
100
101 try {
102 _countDownLatch.await();
103 }
104 catch (InterruptedException ie) {
105 _log.error(
106 "Latch opened prematurely by interruption. Dependence may " +
107 "not be ready.");
108 }
109
110 View oldView = _view;
111
112 _view = view;
113
114 doViewAccepted(oldView, view);
115
116 List<Address> members = view.getMembers();
117
118 Address newCoordinator = members.get(0);
119
120 if (_coordinator.equals(newCoordinator)) {
121 return;
122 }
123
124 _coordinator = newCoordinator;
125
126 doCoordinatorUpdated(newCoordinator);
127 }
128
129 protected abstract void doReceive(Message message);
130
131 protected void doViewAccepted(View oldView, View newView) {
132 }
133
134 protected void doCoordinatorUpdated(Address coordinator) {
135 }
136
137 protected Object retrievePayload(Message message) {
138 byte[] rawBuffer = message.getRawBuffer();
139
140 if (rawBuffer == null) {
141 if (_log.isWarnEnabled()) {
142 _log.warn("Message content is null");
143 }
144
145 return null;
146 }
147
148 ByteBuffer byteBuffer = ByteBuffer.wrap(
149 rawBuffer, message.getOffset(), message.getLength());
150
151 Deserializer deserializer = new Deserializer(byteBuffer.slice());
152
153 try {
154 return deserializer.readObject();
155 }
156 catch (ClassNotFoundException cnfe) {
157 if (_log.isWarnEnabled()) {
158 _log.warn("Unable to deserialize message payload", cnfe);
159 }
160 }
161
162 return null;
163 }
164
165 private static Log _log = LogFactoryUtil.getLog(BaseReceiver.class);
166
167 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
168 private volatile View _view;
169 private volatile Address _coordinator;
170
171 }