001
014
015 package com.liferay.portal.kernel.resiliency.spi.remote;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021 import com.liferay.portal.kernel.nio.intraband.welder.Welder;
022 import com.liferay.portal.kernel.nio.intraband.welder.WelderFactoryUtil;
023 import com.liferay.portal.kernel.process.ProcessCallable;
024 import com.liferay.portal.kernel.process.ProcessException;
025 import com.liferay.portal.kernel.process.local.LocalProcessLauncher;
026 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
027 import com.liferay.portal.kernel.resiliency.mpi.MPI;
028 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
029 import com.liferay.portal.kernel.resiliency.spi.SPI;
030 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
031 import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
032 import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgentFactoryUtil;
033 import com.liferay.portal.kernel.resiliency.spi.provider.SPISynchronousQueueUtil;
034 import com.liferay.portal.kernel.util.PropsKeys;
035
036 import java.io.IOException;
037 import java.io.ObjectInputStream;
038 import java.io.ObjectOutputStream;
039
040 import java.rmi.Remote;
041 import java.rmi.RemoteException;
042 import java.rmi.server.UnicastRemoteObject;
043
044 import java.util.UUID;
045 import java.util.concurrent.ConcurrentMap;
046 import java.util.concurrent.CountDownLatch;
047 import java.util.concurrent.Future;
048 import java.util.concurrent.TimeUnit;
049
050
053 public abstract class RemoteSPI implements ProcessCallable<SPI>, Remote, SPI {
054
055 public RemoteSPI(SPIConfiguration spiConfiguration) {
056 this.spiConfiguration = spiConfiguration;
057
058 mpi = MPIHelperUtil.getMPI();
059
060 UUID uuidObject = UUID.randomUUID();
061
062 uuid = uuidObject.toString();
063
064 welder = WelderFactoryUtil.createWelder();
065 }
066
067 @Override
068 public SPI call() throws ProcessException {
069 try {
070 SPIShutdownHook spiShutdownHook = new SPIShutdownHook();
071
072 LocalProcessLauncher.ProcessContext.attach(
073 spiConfiguration.getSPIId(), spiConfiguration.getPingInterval(),
074 spiShutdownHook);
075
076 Runtime runtime = Runtime.getRuntime();
077
078 runtime.addShutdownHook(spiShutdownHook);
079
080 SPI spi = (SPI)UnicastRemoteObject.exportObject(this, 0);
081
082 RegisterCallback registerCallback = new RegisterCallback(uuid, spi);
083
084 ProcessOutputStream processOutputStream =
085 LocalProcessLauncher.ProcessContext.getProcessOutputStream();
086
087 processOutputStream.writeProcessCallable(registerCallback);
088
089 registrationReference = welder.weld(MPIHelperUtil.getIntraband());
090
091 ConcurrentMap<String, Object> attributes =
092 LocalProcessLauncher.ProcessContext.getAttributes();
093
094 attributes.put(SPI.SPI_INSTANCE_PUBLICATION_KEY, this);
095
096 return spi;
097 }
098 catch (RemoteException re) {
099 throw new ProcessException("Failed to export SPI as RMI stub.", re);
100 }
101 catch (IOException ioe) {
102 throw new ProcessException(ioe);
103 }
104 }
105
106 @Override
107 public void destroy() throws RemoteException {
108 try {
109 doDestroy();
110
111 if (countDownLatch != null) {
112 countDownLatch.countDown();
113 }
114 }
115 finally {
116 UnicastRemoteObject.unexportObject(RemoteSPI.this, true);
117 }
118 }
119
120 @Override
121 public MPI getMPI() {
122 return mpi;
123 }
124
125 @Override
126 public RegistrationReference getRegistrationReference() {
127 return registrationReference;
128 }
129
130 @Override
131 public SPIAgent getSPIAgent() {
132 if (spiAgent == null) {
133 spiAgent = SPIAgentFactoryUtil.createSPIAgent(
134 spiConfiguration, registrationReference);
135 }
136
137 return spiAgent;
138 }
139
140 @Override
141 public SPIConfiguration getSPIConfiguration() {
142 return spiConfiguration;
143 }
144
145 public String getUUID() {
146 return uuid;
147 }
148
149 public Welder getWelder() {
150 return welder;
151 }
152
153 @Override
154 public boolean isAlive() {
155 return true;
156 }
157
158 protected abstract void doDestroy() throws RemoteException;
159
160 protected transient CountDownLatch countDownLatch;
161 protected final MPI mpi;
162 protected RegistrationReference registrationReference;
163 protected transient volatile SPIAgent spiAgent;
164 protected final SPIConfiguration spiConfiguration;
165 protected final String uuid;
166 protected final Welder welder;
167
168 protected static class RegisterCallback implements ProcessCallable<SPI> {
169
170 public RegisterCallback(String spiUUID, SPI spi) {
171 _spiUUID = spiUUID;
172 _spi = spi;
173 }
174
175 @Override
176 public SPI call() throws ProcessException {
177 try {
178 SPISynchronousQueueUtil.notifySynchronousQueue(_spiUUID, _spi);
179 }
180 catch (InterruptedException ie) {
181 throw new ProcessException(ie);
182 }
183
184 return _spi;
185 }
186
187 private static final long serialVersionUID = 1L;
188
189 private final SPI _spi;
190 private final String _spiUUID;
191
192 }
193
194 protected static class UnregisterSPIProcessCallable
195 implements ProcessCallable<Boolean> {
196
197 public UnregisterSPIProcessCallable(
198 String spiProviderName, String spiId) {
199
200 _spiProviderName = spiProviderName;
201 _spiId = spiId;
202 }
203
204 @Override
205 public Boolean call() {
206 SPI spi = MPIHelperUtil.getSPI(_spiProviderName, _spiId);
207
208 if (spi != null) {
209 return MPIHelperUtil.unregisterSPI(spi);
210 }
211
212 return false;
213 }
214
215 private static final long serialVersionUID = 1L;
216
217 private final String _spiId;
218 private final String _spiProviderName;
219
220 }
221
222 protected class SPIShutdownHook
223 extends Thread implements LocalProcessLauncher.ShutdownHook {
224
225 public SPIShutdownHook() {
226 setDaemon(true);
227 setName(SPIShutdownHook.class.getSimpleName());
228 }
229
230 @Override
231 public void run() {
232 if (countDownLatch.getCount() == 0) {
233 return;
234 }
235
236 boolean unregistered = false;
237
238 try {
239 Future<Boolean> future = IntrabandRPCUtil.execute(
240 registrationReference,
241 new UnregisterSPIProcessCallable(
242 getSPIProviderName(), spiConfiguration.getSPIId()));
243
244 unregistered = future.get();
245 }
246 catch (Exception e) {
247 if (_log.isWarnEnabled()) {
248 _log.warn("Unable to unregister SPI from MPI", e);
249 }
250 }
251
252 if (unregistered || !waitForMPI()) {
253 doShutdown();
254 }
255 }
256
257 @Override
258 public boolean shutdown(int shutdownCode, Throwable shutdownThrowable) {
259 Runtime runtime = Runtime.getRuntime();
260
261 runtime.removeShutdownHook(this);
262
263 doShutdown();
264
265 return true;
266 }
267
268 private void doShutdown() {
269 try {
270 RemoteSPI.this.stop();
271 }
272 catch (RemoteException re) {
273 _log.error("Unable to stop SPI", re);
274 }
275
276 try {
277 RemoteSPI.this.destroy();
278 }
279 catch (RemoteException re) {
280 _log.error("Unable to destroy SPI", re);
281 }
282 }
283
284 private boolean waitForMPI() {
285 if (_log.isInfoEnabled()) {
286 _log.info(
287 "Wait up to " + spiConfiguration.getShutdownTimeout() +
288 " ms for MPI shutdown request");
289 }
290
291 try {
292 if (countDownLatch.await(
293 spiConfiguration.getShutdownTimeout(),
294 TimeUnit.MILLISECONDS)) {
295
296 if (_log.isInfoEnabled()) {
297 _log.info("MPI shutdown request received");
298 }
299
300 return true;
301 }
302 }
303 catch (InterruptedException ie) {
304 }
305
306 if (_log.isInfoEnabled()) {
307 _log.info("Proceed with SPI shutdown");
308 }
309
310 return false;
311 }
312
313 }
314
315 private void readObject(ObjectInputStream objectInputStream)
316 throws ClassNotFoundException, IOException {
317
318 objectInputStream.defaultReadObject();
319
320 System.setProperty(
321 PropsKeys.INTRABAND_IMPL, objectInputStream.readUTF());
322 System.setProperty(
323 PropsKeys.INTRABAND_TIMEOUT_DEFAULT, objectInputStream.readUTF());
324 System.setProperty(
325 PropsKeys.INTRABAND_WELDER_IMPL, objectInputStream.readUTF());
326 System.setProperty(
327 "portal:" + PropsKeys.LIFERAY_HOME, objectInputStream.readUTF());
328
329
330
331 System.setProperty("portal:" + PropsKeys.AUTO_DEPLOY_ENABLED, "false");
332
333
334
335 System.setProperty("portal:" + PropsKeys.CLUSTER_LINK_ENABLED, "false");
336
337
338
339 System.setProperty(
340 "portal:" + PropsKeys.HOT_DEPLOY_DEPENDENCY_MANAGEMENT_ENABLED,
341 "false");
342
343
344
345 System.setProperty("spi.id", "-" + spiConfiguration.getSPIId());
346
347 countDownLatch = new CountDownLatch(1);
348 }
349
350 private void writeObject(ObjectOutputStream objectOutputStream)
351 throws IOException {
352
353 objectOutputStream.defaultWriteObject();
354
355 objectOutputStream.writeUTF(
356 System.getProperty(PropsKeys.INTRABAND_IMPL));
357 objectOutputStream.writeUTF(
358 System.getProperty(PropsKeys.INTRABAND_TIMEOUT_DEFAULT));
359 objectOutputStream.writeUTF(
360 System.getProperty(PropsKeys.INTRABAND_WELDER_IMPL));
361 objectOutputStream.writeUTF(System.getProperty(PropsKeys.LIFERAY_HOME));
362 }
363
364 private static final Log _log = LogFactoryUtil.getLog(RemoteSPI.class);
365
366 }