001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019    import com.liferay.portal.kernel.cluster.ClusterRequest;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.notifications.Channel;
023    import com.liferay.portal.kernel.notifications.ChannelException;
024    import com.liferay.portal.kernel.notifications.ChannelHub;
025    import com.liferay.portal.kernel.notifications.ChannelHubManager;
026    import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
027    import com.liferay.portal.kernel.notifications.ChannelListener;
028    import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
029    import com.liferay.portal.kernel.notifications.NotificationEvent;
030    import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
031    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
032    import com.liferay.portal.kernel.util.MethodHandler;
033    import com.liferay.portal.kernel.util.MethodKey;
034    
035    import java.util.Collection;
036    import java.util.Collections;
037    import java.util.List;
038    import java.util.concurrent.ConcurrentHashMap;
039    import java.util.concurrent.ConcurrentMap;
040    
041    /**
042     * @author Edward Han
043     * @author Brian Wing Shun
044     * @author Shuyang Zhou
045     */
046    @DoPrivileged
047    public class ChannelHubManagerImpl implements ChannelHubManager {
048    
049            @Override
050            public void confirmDelivery(
051                            long companyId, long userId,
052                            Collection<String> notificationEventUuids)
053                    throws ChannelException {
054    
055                    confirmDelivery(companyId, userId, notificationEventUuids, false);
056            }
057    
058            @Override
059            public void confirmDelivery(
060                            long companyId, long userId,
061                            Collection<String> notificationEventUuids, boolean archive)
062                    throws ChannelException {
063    
064                    ChannelHub channelHub = getChannelHub(companyId);
065    
066                    channelHub.confirmDelivery(userId, notificationEventUuids, archive);
067            }
068    
069            @Override
070            public void confirmDelivery(
071                            long companyId, long userId, String notificationEventUuid)
072                    throws ChannelException {
073    
074                    confirmDelivery(companyId, userId, notificationEventUuid, false);
075            }
076    
077            @Override
078            public void confirmDelivery(
079                            long companyId, long userId, String notificationEventUuid,
080                            boolean archive)
081                    throws ChannelException {
082    
083                    ChannelHub channelHub = getChannelHub(companyId);
084    
085                    channelHub.confirmDelivery(userId, notificationEventUuid, archive);
086            }
087    
088            @Override
089            public Channel createChannel(long companyId, long userId)
090                    throws ChannelException {
091    
092                    ChannelHub channelHub = getChannelHub(companyId);
093    
094                    return channelHub.createChannel(userId);
095            }
096    
097            @Override
098            public ChannelHub createChannelHub(long companyId) throws ChannelException {
099                    ChannelHub channelHub = new ChannelHubImpl(companyId);
100    
101                    if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
102                            throw new DuplicateChannelHubException(
103                                    "Channel already exists with company id " + companyId);
104                    }
105    
106                    return channelHub;
107            }
108    
109            @Override
110            public void deleteUserNotificiationEvent(
111                            long companyId, long userId, String notificationEventUuid)
112                    throws ChannelException {
113    
114                    ChannelHub channelHub = getChannelHub(companyId);
115    
116                    channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
117            }
118    
119            @Override
120            public void deleteUserNotificiationEvents(
121                            long companyId, long userId,
122                            Collection<String> notificationEventUuids)
123                    throws ChannelException {
124    
125                    ChannelHub channelHub = getChannelHub(companyId);
126    
127                    channelHub.deleteUserNotificiationEvents(
128                            userId, notificationEventUuids);
129            }
130    
131            @Override
132            public void destroyChannel(long companyId, long userId)
133                    throws ChannelException {
134    
135                    ChannelHub channelHub = getChannelHub(companyId);
136    
137                    channelHub.destroyChannel(userId);
138    
139                    if (!ClusterInvokeThreadLocal.isEnabled()) {
140                            return;
141                    }
142    
143                    MethodHandler methodHandler = new MethodHandler(
144                            _destroyChannelMethodKey, companyId, userId);
145    
146                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
147                            methodHandler, true);
148    
149                    try {
150                            ClusterExecutorUtil.execute(clusterRequest);
151                    }
152                    catch (Exception e) {
153                            throw new ChannelException(
154                                    "Unable to destroy channel across cluster", e);
155                    }
156            }
157    
158            @Override
159            public void destroyChannelHub(long companyId) throws ChannelException {
160                    ChannelHub channelHub = _channelHubs.remove(companyId);
161    
162                    if (channelHub != null) {
163                            channelHub.destroy();
164                    }
165            }
166    
167            @Override
168            public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
169                    return fetchChannelHub(companyId, false);
170            }
171    
172            @Override
173            public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
174                    throws ChannelException {
175    
176                    ChannelHub channelHub = _channelHubs.get(companyId);
177    
178                    if (channelHub == null) {
179                            synchronized(_channelHubs) {
180                                    channelHub = _channelHubs.get(companyId);
181    
182                                    if (channelHub == null) {
183                                            if (createIfAbsent) {
184                                                    channelHub = createChannelHub(companyId);
185                                            }
186                                    }
187                            }
188                    }
189    
190                    return channelHub;
191            }
192    
193            @Override
194            public List<NotificationEvent> fetchNotificationEvents(
195                            long companyId, long userId, boolean flush)
196                    throws ChannelException {
197    
198                    ChannelHub channelHub = fetchChannelHub(companyId);
199    
200                    if (channelHub == null) {
201                            return Collections.emptyList();
202                    }
203    
204                    return channelHub.fetchNotificationEvents(userId, flush);
205            }
206    
207            @Override
208            public void flush() throws ChannelException {
209                    for (ChannelHub channelHub : _channelHubs.values()) {
210                            channelHub.flush();
211                    }
212            }
213    
214            @Override
215            public void flush(long companyId) throws ChannelException {
216                    ChannelHub channelHub = fetchChannelHub(companyId);
217    
218                    if (channelHub != null) {
219                            channelHub.flush();
220                    }
221            }
222    
223            @Override
224            public void flush(long companyId, long userId, long timestamp)
225                    throws ChannelException {
226    
227                    ChannelHub channelHub = fetchChannelHub(companyId);
228    
229                    if (channelHub != null) {
230                            channelHub.flush(userId, timestamp);
231                    }
232            }
233    
234            @Override
235            public Channel getChannel(long companyId, long userId)
236                    throws ChannelException {
237    
238                    return getChannel(companyId, userId, false);
239            }
240    
241            @Override
242            public Channel getChannel(
243                            long companyId, long userId, boolean createIfAbsent)
244                    throws ChannelException {
245    
246                    ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
247    
248                    return channelHub.getChannel(userId, createIfAbsent);
249            }
250    
251            @Override
252            public ChannelHub getChannelHub(long companyId) throws ChannelException {
253                    return getChannelHub(companyId, false);
254            }
255    
256            @Override
257            public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
258                    throws ChannelException {
259    
260                    ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
261    
262                    if (channelHub == null) {
263                            throw new UnknownChannelHubException(
264                                    "No channel exists with company id " + companyId);
265                    }
266    
267                    return channelHub;
268            }
269    
270            @Override
271            public List<NotificationEvent> getNotificationEvents(
272                            long companyId, long userId)
273                    throws ChannelException {
274    
275                    ChannelHub channelHub = getChannelHub(companyId);
276    
277                    return channelHub.getNotificationEvents(userId);
278            }
279    
280            @Override
281            public List<NotificationEvent> getNotificationEvents(
282                            long companyId, long userId, boolean flush)
283                    throws ChannelException {
284    
285                    ChannelHub channelHub = getChannelHub(companyId);
286    
287                    return channelHub.getNotificationEvents(userId, flush);
288            }
289    
290            @Override
291            public Collection<Long> getUserIds(long companyId) throws ChannelException {
292                    ChannelHub channelHub = getChannelHub(companyId);
293    
294                    return channelHub.getUserIds();
295            }
296    
297            @Override
298            public void registerChannelListener(
299                            long companyId, long userId, ChannelListener channelListener)
300                    throws ChannelException {
301    
302                    ChannelHub channelHub = getChannelHub(companyId);
303    
304                    channelHub.registerChannelListener(userId, channelListener);
305            }
306    
307            @Override
308            public void removeTransientNotificationEvents(
309                            long companyId, long userId,
310                            Collection<NotificationEvent> notificationEvents)
311                    throws ChannelException {
312    
313                    ChannelHub channelHub = getChannelHub(companyId);
314    
315                    channelHub.removeTransientNotificationEvents(
316                            userId, notificationEvents);
317            }
318    
319            @Override
320            public void removeTransientNotificationEventsByUuid(
321                            long companyId, long userId,
322                            Collection<String> notificationEventUuids)
323                    throws ChannelException {
324    
325                    ChannelHub channelHub = getChannelHub(companyId);
326    
327                    channelHub.removeTransientNotificationEventsByUuid(
328                            userId, notificationEventUuids);
329            }
330    
331            @Override
332            public void sendNotificationEvent(
333                            long companyId, long userId, NotificationEvent notificationEvent)
334                    throws ChannelException {
335    
336                    ChannelHub channelHub = getChannelHub(companyId);
337    
338                    channelHub.sendNotificationEvent(userId, notificationEvent);
339    
340                    if (!ClusterInvokeThreadLocal.isEnabled()) {
341                            return;
342                    }
343    
344                    MethodHandler methodHandler = new MethodHandler(
345                            _storeNotificationEventMethodKey, companyId, userId,
346                            notificationEvent);
347    
348                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
349                            methodHandler, true);
350    
351                    try {
352                            ClusterExecutorUtil.execute(clusterRequest);
353                    }
354                    catch (Exception e) {
355                            throw new ChannelException("Unable to notify cluster of event", e);
356                    }
357            }
358    
359            @Override
360            public void sendNotificationEvents(
361                            long companyId, long userId,
362                            Collection<NotificationEvent> notificationEvents)
363                    throws ChannelException {
364    
365                    ChannelHub channelHub = getChannelHub(companyId);
366    
367                    channelHub.sendNotificationEvents(userId, notificationEvents);
368            }
369    
370            @Override
371            public void storeNotificationEvent(
372                            long companyId, long userId, NotificationEvent notificationEvent)
373                    throws ChannelException {
374    
375                    ChannelHub channelHub = fetchChannelHub(companyId);
376    
377                    if (channelHub != null) {
378                            channelHub.storeNotificationEvent(userId, notificationEvent);
379                    }
380                    else if (_log.isDebugEnabled()) {
381                            _log.debug("No channel hub exists for company " + companyId);
382                    }
383            }
384    
385            @Override
386            public void unregisterChannelListener(
387                            long companyId, long userId, ChannelListener channelListener)
388                    throws ChannelException {
389    
390                    ChannelHub channelHub = getChannelHub(companyId);
391    
392                    channelHub.unregisterChannelListener(userId, channelListener);
393            }
394    
395            private static final Log _log = LogFactoryUtil.getLog(
396                    ChannelHubManagerImpl.class);
397    
398            private static final MethodKey _destroyChannelMethodKey = new MethodKey(
399                    ChannelHubManagerUtil.class, "destroyChannel", long.class, long.class);
400            private static final MethodKey _storeNotificationEventMethodKey =
401                    new MethodKey(
402                            ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
403                            long.class, NotificationEvent.class);
404    
405            private final ConcurrentMap<Long, ChannelHub> _channelHubs =
406                    new ConcurrentHashMap<>();
407    
408    }