001
014
015 package com.liferay.portal.kernel.resiliency.mpi;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.config.MessagingConfigurator;
020 import com.liferay.portal.kernel.messaging.config.MessagingConfiguratorRegistry;
021 import com.liferay.portal.kernel.nio.intraband.Intraband;
022 import com.liferay.portal.kernel.nio.intraband.IntrabandFactoryUtil;
023 import com.liferay.portal.kernel.nio.intraband.SystemDataType;
024 import com.liferay.portal.kernel.nio.intraband.rpc.BootstrapRPCDatagramReceiveHandler;
025 import com.liferay.portal.kernel.resiliency.spi.SPI;
026 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
027 import com.liferay.portal.kernel.resiliency.spi.SPIRegistryUtil;
028 import com.liferay.portal.kernel.resiliency.spi.provider.SPIProvider;
029 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
030 import com.liferay.portal.kernel.util.PropsKeys;
031 import com.liferay.portal.kernel.util.PropsUtil;
032
033 import java.rmi.NoSuchObjectException;
034 import java.rmi.RemoteException;
035 import java.rmi.server.UnicastRemoteObject;
036
037 import java.util.ArrayList;
038 import java.util.Collection;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.concurrent.ConcurrentHashMap;
042 import java.util.concurrent.ConcurrentMap;
043
044
047 public class MPIHelperUtil {
048
049 public static SPI checkSPILiveness(SPI spi) {
050 boolean alive = false;
051
052 try {
053 alive = spi.isAlive();
054 }
055 catch (RemoteException re) {
056 _log.error(re);
057 }
058
059 if (alive) {
060 return spi;
061 }
062
063 unregisterSPI(spi);
064
065 return null;
066 }
067
068 public static Intraband getIntraband() {
069 return _intraband;
070 }
071
072 public static MPI getMPI() {
073 return _mpi;
074 }
075
076 public static SPI getSPI(String spiProviderName, String spiId) {
077 SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
078 spiProviderName);
079
080 if (spiProviderContainer == null) {
081 return null;
082 }
083
084 SPI spi = spiProviderContainer.getSPI(spiId);
085
086 if (spi != null) {
087 spi = checkSPILiveness(spi);
088 }
089
090 return spi;
091 }
092
093 public static SPIProvider getSPIProvider(String spiProviderName) {
094 SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
095 spiProviderName);
096
097 if (spiProviderContainer == null) {
098 return null;
099 }
100
101 return spiProviderContainer.getSPIProvider();
102 }
103
104 public static List<SPIProvider> getSPIProviders() {
105 List<SPIProvider> spiProviders = new ArrayList<>();
106
107 for (SPIProviderContainer spiProviderContainer :
108 _spiProviderContainers.values()) {
109
110 spiProviders.add(spiProviderContainer.getSPIProvider());
111 }
112
113 return spiProviders;
114 }
115
116 public static List<SPI> getSPIs() {
117 List<SPI> spis = new ArrayList<>();
118
119 for (SPIProviderContainer spiProviderContainer :
120 _spiProviderContainers.values()) {
121
122 for (SPI spi : spiProviderContainer.getSPIs()) {
123 spi = checkSPILiveness(spi);
124
125 if (spi != null) {
126 spis.add(spi);
127 }
128 }
129 }
130
131 return spis;
132 }
133
134 public static List<SPI> getSPIs(String spiProviderName) {
135 List<SPI> spis = new ArrayList<>();
136
137 SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
138 spiProviderName);
139
140 if (spiProviderContainer != null) {
141 for (SPI spi : spiProviderContainer.getSPIs()) {
142 spi = checkSPILiveness(spi);
143
144 if (spi != null) {
145 spis.add(spi);
146 }
147 }
148 }
149
150 return spis;
151 }
152
153 public static boolean registerSPI(SPI spi) {
154 try {
155 MPI mpi = spi.getMPI();
156
157 if (mpi != _mpi) {
158 if (_log.isWarnEnabled()) {
159 _log.warn(
160 "Not registering SPI " + spi + " with foreign MPI " +
161 mpi + " versus " + _mpi);
162 }
163
164 return false;
165 }
166
167 String spiProviderName = spi.getSPIProviderName();
168
169 SPIProviderContainer spiProviderContainer =
170 _spiProviderContainers.get(spiProviderName);
171
172 if (spiProviderContainer == null) {
173 if (_log.isWarnEnabled()) {
174 _log.warn(
175 "Not registering SPI " + spi +
176 " with unknown SPI provider " + spiProviderName);
177 }
178
179 return false;
180 }
181
182 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
183
184 SPI previousSPI = spiProviderContainer.putSPIIfAbsent(
185 spiConfiguration.getSPIId(), spi);
186
187 if (previousSPI != null) {
188 if (_log.isWarnEnabled()) {
189 _log.warn(
190 "Not registering SPI " + spi +
191 " because it duplicates " + previousSPI);
192 }
193
194 return false;
195 }
196
197 SPIRegistryUtil.registerSPI(spi);
198
199 for (String servletContextName :
200 spiConfiguration.getServletContextNames()) {
201
202 List<MessagingConfigurator> messagingConfigurators =
203 MessagingConfiguratorRegistry.getMessagingConfigurators(
204 servletContextName);
205
206 if (messagingConfigurators != null) {
207 for (MessagingConfigurator messagingConfigurator :
208 messagingConfigurators) {
209
210 messagingConfigurator.disconnect();
211 }
212 }
213 }
214
215 if (_log.isInfoEnabled()) {
216 _log.info("Registered SPI " + spi);
217 }
218
219 return true;
220 }
221 catch (RemoteException re) {
222 throw new RuntimeException(re);
223 }
224 }
225
226 public static boolean registerSPIProvider(SPIProvider spiProvider) {
227 String spiProviderName = spiProvider.getName();
228
229 SPIProviderContainer previousSPIProviderContainer =
230 _spiProviderContainers.putIfAbsent(
231 spiProviderName, new SPIProviderContainer(spiProvider));
232
233 if (previousSPIProviderContainer != null) {
234 if (_log.isWarnEnabled()) {
235 _log.warn(
236 "Not registering SPI provider " + spiProvider +
237 " because it duplicates " +
238 previousSPIProviderContainer.getSPIProvider());
239 }
240
241 return false;
242 }
243
244 if (_log.isInfoEnabled()) {
245 _log.info("Registered SPI provider " + spiProvider);
246 }
247
248 return true;
249 }
250
251 public static void shutdown() {
252 try {
253 UnicastRemoteObject.unexportObject(_mpiImpl, true);
254 }
255 catch (NoSuchObjectException nsoe) {
256 if (_log.isWarnEnabled()) {
257 _log.warn("Unable to unexport " + _mpiImpl, nsoe);
258 }
259 }
260
261 try {
262 _intraband.close();
263 }
264 catch (Exception e) {
265 if (_log.isWarnEnabled()) {
266 _log.warn("Unable to close intraband", e);
267 }
268 }
269 }
270
271 public static boolean unregisterSPI(SPI spi) {
272 try {
273 if (spi == _unregisteringSPIThreadLocal.get()) {
274 _doUnregisterSPI(spi);
275
276 return true;
277 }
278
279 MPI mpi = spi.getMPI();
280
281 if (mpi != _mpi) {
282 if (_log.isWarnEnabled()) {
283 _log.warn(
284 "Not unregistering SPI " + spi + " with foreign MPI " +
285 mpi + " versus " + _mpi);
286 }
287
288 return false;
289 }
290
291 String spiProviderName = spi.getSPIProviderName();
292
293 SPIProviderContainer spiProviderContainer =
294 _spiProviderContainers.get(spiProviderName);
295
296 if (spiProviderContainer == null) {
297 if (_log.isWarnEnabled()) {
298 _log.warn(
299 "Not unregistering SPI " + spi +
300 " with unknown SPI provider " + spiProviderName);
301 }
302
303 return false;
304 }
305
306 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
307
308 if (spiProviderContainer.removeSPI(
309 spiConfiguration.getSPIId(), spi)) {
310
311 _doUnregisterSPI(spi);
312
313 return true;
314 }
315
316 if (_log.isWarnEnabled()) {
317 _log.warn("Not unregistering unregistered SPI " + spi);
318 }
319
320 return false;
321 }
322 catch (RemoteException re) {
323 throw new RuntimeException(re);
324 }
325 }
326
327 public static boolean unregisterSPIProvider(SPIProvider spiProvider) {
328 String spiProviderName = spiProvider.getName();
329
330 SPIProviderContainer spiProviderContainer = _spiProviderContainers.get(
331 spiProviderName);
332
333 if ((spiProviderContainer != null) &&
334 (spiProviderContainer.getSPIProvider() == spiProvider) &&
335 _spiProviderContainers.remove(
336 spiProviderName, spiProviderContainer)) {
337
338 Collection<SPI> spis = spiProviderContainer.getSPIs();
339
340 Iterator<SPI> iterator = spis.iterator();
341
342 while (iterator.hasNext()) {
343 SPI spi = iterator.next();
344
345 iterator.remove();
346
347 _unregisteringSPIThreadLocal.set(spi);
348
349 try {
350 spi.stop();
351
352 spi.destroy();
353
354 if (_log.isInfoEnabled()) {
355 _log.info(
356 "Unregistered SPI " + spi +
357 " while unregistering SPI provider " +
358 spiProvider);
359 }
360 }
361 catch (RemoteException re) {
362 _log.error(
363 "Unable to unregister SPI " + spi +
364 " while unregistering SPI provider " + spiProvider,
365 re);
366 }
367 finally {
368 _unregisteringSPIThreadLocal.remove();
369 }
370 }
371
372 if (_log.isInfoEnabled()) {
373 _log.info("Unregistered SPI provider " + spiProvider);
374 }
375
376 return true;
377 }
378
379 if (_log.isWarnEnabled()) {
380 _log.warn(
381 "Not unregistering unregistered SPI provider " + spiProvider);
382 }
383
384 return false;
385 }
386
387 private static void _doUnregisterSPI(SPI spi) throws RemoteException {
388 SPIRegistryUtil.unregisterSPI(spi);
389
390 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
391
392 for (String servletContextName :
393 spiConfiguration.getServletContextNames()) {
394
395 List<MessagingConfigurator> messagingConfigurators =
396 MessagingConfiguratorRegistry.getMessagingConfigurators(
397 servletContextName);
398
399 if (messagingConfigurators == null) {
400 continue;
401 }
402
403 for (MessagingConfigurator messagingConfigurator :
404 messagingConfigurators) {
405
406 messagingConfigurator.connect();
407 }
408 }
409
410 if (_log.isInfoEnabled()) {
411 _log.info("Unregistered SPI " + spi);
412 }
413 }
414
415 private static final Log _log = LogFactoryUtil.getLog(MPIHelperUtil.class);
416
417 private static final Intraband _intraband;
418 private static final MPI _mpi;
419 private static final MPI _mpiImpl;
420 private static final ConcurrentMap<String, SPIProviderContainer>
421 _spiProviderContainers = new ConcurrentHashMap<>();
422 private static final ThreadLocal<SPI> _unregisteringSPIThreadLocal =
423 new CentralizedThreadLocal<>(true);
424
425 static {
426
427
428
429 _mpiImpl = new MPIImpl();
430
431 try {
432 if (PropsUtil.getProps() != null) {
433 System.setProperty(
434 PropsKeys.INTRABAND_IMPL,
435 PropsUtil.get(PropsKeys.INTRABAND_IMPL));
436 System.setProperty(
437 PropsKeys.INTRABAND_TIMEOUT_DEFAULT,
438 PropsUtil.get(PropsKeys.INTRABAND_TIMEOUT_DEFAULT));
439 System.setProperty(
440 PropsKeys.INTRABAND_WELDER_IMPL,
441 PropsUtil.get(PropsKeys.INTRABAND_WELDER_IMPL));
442 }
443
444 _intraband = IntrabandFactoryUtil.createIntraband();
445
446 _intraband.registerDatagramReceiveHandler(
447 SystemDataType.RPC.getValue(),
448 new BootstrapRPCDatagramReceiveHandler());
449
450 _mpi = (MPI)UnicastRemoteObject.exportObject(_mpiImpl, 0);
451 }
452 catch (Exception e) {
453 throw new ExceptionInInitializerError(e);
454 }
455 }
456
457 private static class MPIImpl implements MPI {
458
459 @Override
460 public boolean isAlive() {
461 return true;
462 }
463
464 }
465
466 private static class SPIProviderContainer {
467
468 public SPIProviderContainer(SPIProvider spiProvider) {
469 _spiProvider = spiProvider;
470 }
471
472 public SPI getSPI(String spiId) {
473 return _spis.get(spiId);
474 }
475
476 public SPIProvider getSPIProvider() {
477 return _spiProvider;
478 }
479
480 public Collection<SPI> getSPIs() {
481 return _spis.values();
482 }
483
484 public SPI putSPIIfAbsent(String spiId, SPI spi) {
485 return _spis.putIfAbsent(spiId, spi);
486 }
487
488 public boolean removeSPI(String spiId, SPI spi) {
489 return _spis.remove(spiId, spi);
490 }
491
492 private final SPIProvider _spiProvider;
493 private final ConcurrentMap<String, SPI> _spis =
494 new ConcurrentHashMap<>();
495
496 }
497
498 }