001
014
015 package com.liferay.portal.kernel.resiliency.spi.remote;
016
017 import com.liferay.portal.kernel.deploy.hot.DependencyManagementThreadLocal;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
022 import com.liferay.portal.kernel.nio.intraband.welder.Welder;
023 import com.liferay.portal.kernel.nio.intraband.welder.WelderFactoryUtil;
024 import com.liferay.portal.kernel.process.ProcessCallable;
025 import com.liferay.portal.kernel.process.ProcessException;
026 import com.liferay.portal.kernel.process.local.LocalProcessLauncher;
027 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
028 import com.liferay.portal.kernel.resiliency.mpi.MPI;
029 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
030 import com.liferay.portal.kernel.resiliency.spi.SPI;
031 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
032 import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgent;
033 import com.liferay.portal.kernel.resiliency.spi.agent.SPIAgentFactoryUtil;
034 import com.liferay.portal.kernel.resiliency.spi.provider.SPISynchronousQueueUtil;
035 import com.liferay.portal.kernel.util.PropsKeys;
036 import com.liferay.portal.kernel.util.ReflectionUtil;
037
038 import java.io.IOException;
039 import java.io.ObjectInputStream;
040 import java.io.ObjectOutputStream;
041
042 import java.lang.reflect.Field;
043
044 import java.rmi.Remote;
045 import java.rmi.RemoteException;
046 import java.rmi.server.UnicastRemoteObject;
047
048 import java.util.UUID;
049 import java.util.concurrent.ConcurrentMap;
050 import java.util.concurrent.CountDownLatch;
051 import java.util.concurrent.Future;
052 import java.util.concurrent.TimeUnit;
053
054
057 public abstract class RemoteSPI implements ProcessCallable<SPI>, Remote, SPI {
058
059 public RemoteSPI(SPIConfiguration spiConfiguration) {
060 this.spiConfiguration = spiConfiguration;
061
062 mpi = MPIHelperUtil.getMPI();
063
064 UUID uuidObject = UUID.randomUUID();
065
066 uuid = uuidObject.toString();
067
068 welder = WelderFactoryUtil.createWelder();
069 }
070
071 @Override
072 public SPI call() throws ProcessException {
073 try {
074 SPIShutdownHook spiShutdownHook = new SPIShutdownHook();
075
076 LocalProcessLauncher.ProcessContext.attach(
077 spiConfiguration.getSPIId(), spiConfiguration.getPingInterval(),
078 spiShutdownHook);
079
080 Runtime runtime = Runtime.getRuntime();
081
082 runtime.addShutdownHook(spiShutdownHook);
083
084 SPI spi = (SPI)UnicastRemoteObject.exportObject(this, 0);
085
086 RegisterCallback registerCallback = new RegisterCallback(uuid, spi);
087
088 ProcessOutputStream processOutputStream =
089 LocalProcessLauncher.ProcessContext.getProcessOutputStream();
090
091 processOutputStream.writeProcessCallable(registerCallback);
092
093 registrationReference = welder.weld(MPIHelperUtil.getIntraband());
094
095 ConcurrentMap<String, Object> attributes =
096 LocalProcessLauncher.ProcessContext.getAttributes();
097
098 attributes.put(SPI.SPI_INSTANCE_PUBLICATION_KEY, this);
099
100 return spi;
101 }
102 catch (RemoteException re) {
103 throw new ProcessException("Failed to export SPI as RMI stub.", re);
104 }
105 catch (IOException ioe) {
106 throw new ProcessException(ioe);
107 }
108 }
109
110 @Override
111 public void destroy() throws RemoteException {
112 try {
113 doDestroy();
114
115 if (countDownLatch != null) {
116 countDownLatch.countDown();
117 }
118 }
119 finally {
120 UnicastRemoteObject.unexportObject(RemoteSPI.this, true);
121 }
122 }
123
124 @Override
125 public MPI getMPI() {
126 return mpi;
127 }
128
129 @Override
130 public RegistrationReference getRegistrationReference() {
131 return registrationReference;
132 }
133
134 @Override
135 public SPIAgent getSPIAgent() {
136 if (spiAgent == null) {
137 spiAgent = SPIAgentFactoryUtil.createSPIAgent(
138 spiConfiguration, registrationReference);
139 }
140
141 return spiAgent;
142 }
143
144 @Override
145 public SPIConfiguration getSPIConfiguration() {
146 return spiConfiguration;
147 }
148
149 public String getUUID() {
150 return uuid;
151 }
152
153 public Welder getWelder() {
154 return welder;
155 }
156
157 @Override
158 public boolean isAlive() {
159 return true;
160 }
161
162 protected abstract void doDestroy() throws RemoteException;
163
164 protected transient CountDownLatch countDownLatch;
165 protected final MPI mpi;
166 protected RegistrationReference registrationReference;
167 protected transient volatile SPIAgent spiAgent;
168 protected final SPIConfiguration spiConfiguration;
169 protected final String uuid;
170 protected final Welder welder;
171
172 protected static class RegisterCallback implements ProcessCallable<SPI> {
173
174 public RegisterCallback(String spiUUID, SPI spi) {
175 _spiUUID = spiUUID;
176 _spi = spi;
177 }
178
179 @Override
180 public SPI call() throws ProcessException {
181 try {
182 SPISynchronousQueueUtil.notifySynchronousQueue(_spiUUID, _spi);
183 }
184 catch (InterruptedException ie) {
185 throw new ProcessException(ie);
186 }
187
188 return _spi;
189 }
190
191 private static final long serialVersionUID = 1L;
192
193 private final SPI _spi;
194 private final String _spiUUID;
195
196 }
197
198 protected static class UnregisterSPIProcessCallable
199 implements ProcessCallable<Boolean> {
200
201 public UnregisterSPIProcessCallable(
202 String spiProviderName, String spiId) {
203
204 _spiProviderName = spiProviderName;
205 _spiId = spiId;
206 }
207
208 @Override
209 public Boolean call() {
210 SPI spi = MPIHelperUtil.getSPI(_spiProviderName, _spiId);
211
212 if (spi != null) {
213 return MPIHelperUtil.unregisterSPI(spi);
214 }
215
216 return false;
217 }
218
219 private static final long serialVersionUID = 1L;
220
221 private final String _spiId;
222 private final String _spiProviderName;
223
224 }
225
226 protected class SPIShutdownHook
227 extends Thread implements LocalProcessLauncher.ShutdownHook {
228
229 public SPIShutdownHook() {
230 setDaemon(true);
231 setName(SPIShutdownHook.class.getSimpleName());
232 }
233
234 @Override
235 public void run() {
236 if (countDownLatch.getCount() == 0) {
237 return;
238 }
239
240 boolean unregistered = false;
241
242 try {
243 Future<Boolean> future = IntrabandRPCUtil.execute(
244 registrationReference,
245 new UnregisterSPIProcessCallable(
246 getSPIProviderName(), spiConfiguration.getSPIId()));
247
248 unregistered = future.get();
249 }
250 catch (Exception e) {
251 if (_log.isWarnEnabled()) {
252 _log.warn("Unable to unregister SPI from MPI", e);
253 }
254 }
255
256 if (unregistered || !waitForMPI()) {
257 doShutdown();
258 }
259 }
260
261 @Override
262 public boolean shutdown(int shutdownCode, Throwable shutdownThrowable) {
263 Runtime runtime = Runtime.getRuntime();
264
265 runtime.removeShutdownHook(this);
266
267 doShutdown();
268
269 return true;
270 }
271
272 private void doShutdown() {
273 try {
274 RemoteSPI.this.stop();
275 }
276 catch (RemoteException re) {
277 _log.error("Unable to stop SPI", re);
278 }
279
280 try {
281 RemoteSPI.this.destroy();
282 }
283 catch (RemoteException re) {
284 _log.error("Unable to destroy SPI", re);
285 }
286 }
287
288 private boolean waitForMPI() {
289 if (_log.isInfoEnabled()) {
290 _log.info(
291 "Wait up to " + spiConfiguration.getShutdownTimeout() +
292 " ms for MPI shutdown request");
293 }
294
295 try {
296 if (countDownLatch.await(
297 spiConfiguration.getShutdownTimeout(),
298 TimeUnit.MILLISECONDS)) {
299
300 if (_log.isInfoEnabled()) {
301 _log.info("MPI shutdown request received");
302 }
303
304 return true;
305 }
306 }
307 catch (InterruptedException ie) {
308 }
309
310 if (_log.isInfoEnabled()) {
311 _log.info("Proceed with SPI shutdown");
312 }
313
314 return false;
315 }
316
317 }
318
319 private void readObject(ObjectInputStream objectInputStream)
320 throws ClassNotFoundException, IOException {
321
322 objectInputStream.defaultReadObject();
323
324 System.setProperty(
325 PropsKeys.INTRABAND_IMPL, objectInputStream.readUTF());
326 System.setProperty(
327 PropsKeys.INTRABAND_TIMEOUT_DEFAULT, objectInputStream.readUTF());
328 System.setProperty(
329 PropsKeys.INTRABAND_WELDER_IMPL, objectInputStream.readUTF());
330 System.setProperty(
331 "portal:" + PropsKeys.LIFERAY_HOME, objectInputStream.readUTF());
332
333
334
335 System.setProperty("portal:" + PropsKeys.AUTO_DEPLOY_ENABLED, "false");
336
337
338
339 System.setProperty("portal:" + PropsKeys.CLUSTER_LINK_ENABLED, "false");
340
341
342
343 try {
344 Field enabledField = ReflectionUtil.getDeclaredField(
345 DependencyManagementThreadLocal.class, "_enabled");
346
347 enabledField.set(
348 null,
349 new ThreadLocal<Boolean>() {
350
351 @Override
352 public Boolean get() {
353 return Boolean.FALSE;
354 }
355
356 });
357 }
358 catch (Exception e) {
359 throw new IOException("Unable to disable dependency management", e);
360 }
361
362
363
364 System.setProperty("spi.id", "-" + spiConfiguration.getSPIId());
365
366 countDownLatch = new CountDownLatch(1);
367 }
368
369 private void writeObject(ObjectOutputStream objectOutputStream)
370 throws IOException {
371
372 objectOutputStream.defaultWriteObject();
373
374 objectOutputStream.writeUTF(
375 System.getProperty(PropsKeys.INTRABAND_IMPL));
376 objectOutputStream.writeUTF(
377 System.getProperty(PropsKeys.INTRABAND_TIMEOUT_DEFAULT));
378 objectOutputStream.writeUTF(
379 System.getProperty(PropsKeys.INTRABAND_WELDER_IMPL));
380 objectOutputStream.writeUTF(System.getProperty(PropsKeys.LIFERAY_HOME));
381 }
382
383 private static final Log _log = LogFactoryUtil.getLog(RemoteSPI.class);
384
385 }