001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019    import com.liferay.portal.kernel.cluster.ClusterRequest;
020    import com.liferay.portal.kernel.notifications.Channel;
021    import com.liferay.portal.kernel.notifications.ChannelException;
022    import com.liferay.portal.kernel.notifications.ChannelHub;
023    import com.liferay.portal.kernel.notifications.ChannelHubManager;
024    import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
025    import com.liferay.portal.kernel.notifications.ChannelListener;
026    import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
027    import com.liferay.portal.kernel.notifications.NotificationEvent;
028    import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
029    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
030    import com.liferay.portal.kernel.util.MethodHandler;
031    import com.liferay.portal.kernel.util.MethodKey;
032    
033    import java.util.Collection;
034    import java.util.Collections;
035    import java.util.List;
036    import java.util.concurrent.ConcurrentHashMap;
037    import java.util.concurrent.ConcurrentMap;
038    
039    /**
040     * @author Edward Han
041     * @author Brian Wing Shun
042     * @author Shuyang Zhou
043     */
044    @DoPrivileged
045    public class ChannelHubManagerImpl implements ChannelHubManager {
046    
047            @Override
048            public void confirmDelivery(
049                            long companyId, long userId,
050                            Collection<String> notificationEventUuids)
051                    throws ChannelException {
052    
053                    confirmDelivery(companyId, userId, notificationEventUuids, false);
054            }
055    
056            @Override
057            public void confirmDelivery(
058                            long companyId, long userId,
059                            Collection<String> notificationEventUuids, boolean archive)
060                    throws ChannelException {
061    
062                    ChannelHub channelHub = getChannelHub(companyId);
063    
064                    channelHub.confirmDelivery(userId, notificationEventUuids, archive);
065            }
066    
067            @Override
068            public void confirmDelivery(
069                            long companyId, long userId, String notificationEventUuid)
070                    throws ChannelException {
071    
072                    confirmDelivery(companyId, userId, notificationEventUuid, false);
073            }
074    
075            @Override
076            public void confirmDelivery(
077                            long companyId, long userId, String notificationEventUuid,
078                            boolean archive)
079                    throws ChannelException {
080    
081                    ChannelHub channelHub = getChannelHub(companyId);
082    
083                    channelHub.confirmDelivery(userId, notificationEventUuid, archive);
084            }
085    
086            @Override
087            public Channel createChannel(long companyId, long userId)
088                    throws ChannelException {
089    
090                    ChannelHub channelHub = getChannelHub(companyId);
091    
092                    return channelHub.createChannel(userId);
093            }
094    
095            @Override
096            public ChannelHub createChannelHub(long companyId) throws ChannelException {
097                    ChannelHub channelHub = _channelHub.clone(companyId);
098    
099                    if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
100                            throw new DuplicateChannelHubException(
101                                    "Channel already exists with company id " + companyId);
102                    }
103    
104                    return channelHub;
105            }
106    
107            @Override
108            public void deleteUserNotificiationEvent(
109                            long companyId, long userId, String notificationEventUuid)
110                    throws ChannelException {
111    
112                    ChannelHub channelHub = getChannelHub(companyId);
113    
114                    channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
115            }
116    
117            @Override
118            public void deleteUserNotificiationEvents(
119                            long companyId, long userId,
120                            Collection<String> notificationEventUuids)
121                    throws ChannelException {
122    
123                    ChannelHub channelHub = getChannelHub(companyId);
124    
125                    channelHub.deleteUserNotificiationEvents(
126                            userId, notificationEventUuids);
127            }
128    
129            @Override
130            public void destroyChannel(long companyId, long userId)
131                    throws ChannelException {
132    
133                    ChannelHub channelHub = getChannelHub(companyId);
134    
135                    channelHub.destroyChannel(userId);
136            }
137    
138            @Override
139            public void destroyChannelHub(long companyId) throws ChannelException {
140                    ChannelHub channelHub = _channelHubs.remove(companyId);
141    
142                    if (channelHub != null) {
143                            channelHub.destroy();
144                    }
145            }
146    
147            @Override
148            public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
149                    return fetchChannelHub(companyId, false);
150            }
151    
152            @Override
153            public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
154                    throws ChannelException {
155    
156                    ChannelHub channelHub = _channelHubs.get(companyId);
157    
158                    if (channelHub == null) {
159                            synchronized(_channelHubs) {
160                                    channelHub = _channelHubs.get(companyId);
161    
162                                    if (channelHub == null) {
163                                            if (createIfAbsent) {
164                                                    channelHub = createChannelHub(companyId);
165                                            }
166                                    }
167                            }
168                    }
169    
170                    return channelHub;
171            }
172    
173            @Override
174            public List<NotificationEvent> fetchNotificationEvents(
175                            long companyId, long userId, boolean flush)
176                    throws ChannelException {
177    
178                    ChannelHub channelHub = fetchChannelHub(companyId);
179    
180                    if (channelHub == null) {
181                            return Collections.emptyList();
182                    }
183    
184                    return channelHub.fetchNotificationEvents(userId, flush);
185            }
186    
187            @Override
188            public void flush() throws ChannelException {
189                    for (ChannelHub channelHub : _channelHubs.values()) {
190                            channelHub.flush();
191                    }
192            }
193    
194            @Override
195            public void flush(long companyId) throws ChannelException {
196                    ChannelHub channelHub = fetchChannelHub(companyId);
197    
198                    if (channelHub != null) {
199                            channelHub.flush();
200                    }
201            }
202    
203            @Override
204            public void flush(long companyId, long userId, long timestamp)
205                    throws ChannelException {
206    
207                    ChannelHub channelHub = fetchChannelHub(companyId);
208    
209                    if (channelHub != null) {
210                            channelHub.flush(userId, timestamp);
211                    }
212            }
213    
214            @Override
215            public Channel getChannel(long companyId, long userId)
216                    throws ChannelException {
217    
218                    return getChannel(companyId, userId, false);
219            }
220    
221            @Override
222            public Channel getChannel(
223                            long companyId, long userId, boolean createIfAbsent)
224                    throws ChannelException {
225    
226                    ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
227    
228                    return channelHub.getChannel(userId, createIfAbsent);
229            }
230    
231            @Override
232            public ChannelHub getChannelHub(long companyId) throws ChannelException {
233                    return getChannelHub(companyId, false);
234            }
235    
236            @Override
237            public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
238                    throws ChannelException {
239    
240                    ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
241    
242                    if (channelHub == null) {
243                            throw new UnknownChannelHubException(
244                                    "No channel exists with company id " + companyId);
245                    }
246    
247                    return channelHub;
248            }
249    
250            @Override
251            public List<NotificationEvent> getNotificationEvents(
252                            long companyId, long userId)
253                    throws ChannelException {
254    
255                    ChannelHub channelHub = getChannelHub(companyId);
256    
257                    return channelHub.getNotificationEvents(userId);
258            }
259    
260            @Override
261            public List<NotificationEvent> getNotificationEvents(
262                            long companyId, long userId, boolean flush)
263                    throws ChannelException {
264    
265                    ChannelHub channelHub = getChannelHub(companyId);
266    
267                    return channelHub.getNotificationEvents(userId, flush);
268            }
269    
270            @Override
271            public Collection<Long> getUserIds(long companyId) throws ChannelException {
272                    ChannelHub channelHub = getChannelHub(companyId);
273    
274                    return channelHub.getUserIds();
275            }
276    
277            @Override
278            public void registerChannelListener(
279                            long companyId, long userId, ChannelListener channelListener)
280                    throws ChannelException {
281    
282                    ChannelHub channelHub = getChannelHub(companyId);
283    
284                    channelHub.registerChannelListener(userId, channelListener);
285            }
286    
287            @Override
288            public void removeTransientNotificationEvents(
289                            long companyId, long userId,
290                            Collection<NotificationEvent> notificationEvents)
291                    throws ChannelException {
292    
293                    ChannelHub channelHub = getChannelHub(companyId);
294    
295                    channelHub.removeTransientNotificationEvents(
296                            userId, notificationEvents);
297            }
298    
299            @Override
300            public void removeTransientNotificationEventsByUuid(
301                            long companyId, long userId,
302                            Collection<String> notificationEventUuids)
303                    throws ChannelException {
304    
305                    ChannelHub channelHub = getChannelHub(companyId);
306    
307                    channelHub.removeTransientNotificationEventsByUuid(
308                            userId, notificationEventUuids);
309            }
310    
311            @Override
312            public void sendNotificationEvent(
313                            long companyId, long userId, NotificationEvent notificationEvent)
314                    throws ChannelException {
315    
316                    ChannelHub channelHub = getChannelHub(companyId);
317    
318                    channelHub.sendNotificationEvent(userId, notificationEvent);
319    
320                    if (!ClusterInvokeThreadLocal.isEnabled()) {
321                            return;
322                    }
323    
324                    MethodHandler methodHandler = new MethodHandler(
325                            _storeNotificationEventMethodKey, companyId, userId,
326                            notificationEvent);
327    
328                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
329                            methodHandler, true);
330    
331                    try {
332                            ClusterExecutorUtil.execute(clusterRequest);
333                    }
334                    catch (Exception e) {
335                            throw new ChannelException("Unable to notify cluster of event", e);
336                    }
337            }
338    
339            @Override
340            public void sendNotificationEvents(
341                            long companyId, long userId,
342                            Collection<NotificationEvent> notificationEvents)
343                    throws ChannelException {
344    
345                    ChannelHub channelHub = getChannelHub(companyId);
346    
347                    channelHub.sendNotificationEvents(userId, notificationEvents);
348            }
349    
350            public void setChannelHubPrototype(ChannelHub channelHub) {
351                    _channelHub = channelHub;
352            }
353    
354            @Override
355            public void storeNotificationEvent(
356                            long companyId, long userId, NotificationEvent notificationEvent)
357                    throws ChannelException {
358    
359                    ChannelHub channelHub = getChannelHub(companyId);
360    
361                    channelHub.storeNotificationEvent(userId, notificationEvent);
362            }
363    
364            @Override
365            public void unregisterChannelListener(
366                            long companyId, long userId, ChannelListener channelListener)
367                    throws ChannelException {
368    
369                    ChannelHub channelHub = getChannelHub(companyId);
370    
371                    channelHub.unregisterChannelListener(userId, channelListener);
372            }
373    
374            private static final MethodKey _storeNotificationEventMethodKey =
375                    new MethodKey(
376                            ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
377                            long.class, NotificationEvent.class);
378    
379            private ChannelHub _channelHub;
380            private ConcurrentMap<Long, ChannelHub> _channelHubs =
381                    new ConcurrentHashMap<Long, ChannelHub>();
382    
383    }