001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.notifications.Channel;
018 import com.liferay.portal.kernel.notifications.ChannelException;
019 import com.liferay.portal.kernel.notifications.ChannelHub;
020 import com.liferay.portal.kernel.notifications.ChannelListener;
021 import com.liferay.portal.kernel.notifications.NotificationEvent;
022 import com.liferay.portal.kernel.notifications.UnknownChannelException;
023 import com.liferay.portal.kernel.service.UserNotificationEventLocalServiceUtil;
024 import com.liferay.portal.util.PropsValues;
025
026 import java.util.ArrayList;
027 import java.util.Collection;
028 import java.util.Collections;
029 import java.util.Iterator;
030 import java.util.List;
031 import java.util.Map;
032 import java.util.Set;
033 import java.util.concurrent.ConcurrentHashMap;
034 import java.util.concurrent.ConcurrentMap;
035
036
041 public class ChannelHubImpl implements ChannelHub {
042
043 public ChannelHubImpl(long companyId) {
044 _companyId = companyId;
045 }
046
047 @Override
048 public void cleanUp() throws ChannelException {
049 for (Channel channel : _channels.values()) {
050 channel.cleanUp();
051 }
052 }
053
054 @Override
055 public void cleanUp(long userId) throws ChannelException {
056 Channel channel = getChannel(userId);
057
058 channel.cleanUp();
059 }
060
061 @Override
062 public void confirmDelivery(
063 long userId, Collection<String> notificationEventUuids)
064 throws ChannelException {
065
066 confirmDelivery(userId, notificationEventUuids, false);
067 }
068
069 @Override
070 public void confirmDelivery(
071 long userId, Collection<String> notificationEventUuids,
072 boolean archive)
073 throws ChannelException {
074
075 Channel channel = getChannel(userId);
076
077 channel.confirmDelivery(notificationEventUuids, archive);
078 }
079
080 @Override
081 public void confirmDelivery(long userId, String notificationEventUuid)
082 throws ChannelException {
083
084 confirmDelivery(userId, notificationEventUuid, false);
085 }
086
087 @Override
088 public void confirmDelivery(
089 long userId, String notificationEventUuid, boolean archive)
090 throws ChannelException {
091
092 Channel channel = getChannel(userId);
093
094 channel.confirmDelivery(notificationEventUuid, archive);
095 }
096
097 @Override
098 public Channel createChannel(long userId) throws ChannelException {
099 if (_channels.containsKey(userId)) {
100 return _channels.get(userId);
101 }
102
103 Channel channel = new ChannelImpl(_companyId, userId);
104
105 Channel oldChannel = _channels.putIfAbsent(userId, channel);
106
107 if (oldChannel != null) {
108 channel.sendNotificationEvents(oldChannel.getNotificationEvents());
109 }
110
111 channel.init();
112
113 return channel;
114 }
115
116 @Override
117 public void deleteUserNotificiationEvent(
118 long userId, String notificationEventUuid)
119 throws ChannelException {
120
121 Channel channel = getChannel(userId);
122
123 channel.deleteUserNotificiationEvent(notificationEventUuid);
124 }
125
126 @Override
127 public void deleteUserNotificiationEvents(
128 long userId, Collection<String> notificationEventUuids)
129 throws ChannelException {
130
131 Channel channel = getChannel(userId);
132
133 channel.deleteUserNotificiationEvents(notificationEventUuids);
134 }
135
136 @Override
137 public void destroy() throws ChannelException {
138 Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
139
140 Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
141
142 while (itr.hasNext()) {
143 Channel channel = itr.next().getValue();
144
145 channel.close();
146
147 itr.remove();
148 }
149 }
150
151 @Override
152 public Channel destroyChannel(long userId) throws ChannelException {
153 Channel channel = _channels.remove(userId);
154
155 if (channel != null) {
156 channel.close();
157 }
158
159 return channel;
160 }
161
162 @Override
163 public Channel fetchChannel(long userId) throws ChannelException {
164 return fetchChannel(userId, false);
165 }
166
167 @Override
168 public Channel fetchChannel(long userId, boolean createIfAbsent)
169 throws ChannelException {
170
171 Channel channel = _channels.get(userId);
172
173 if (channel == null) {
174 synchronized (_channels) {
175 channel = _channels.get(userId);
176
177 if (channel == null) {
178 if (createIfAbsent) {
179 channel = createChannel(userId);
180 }
181 }
182 }
183 }
184
185 return channel;
186 }
187
188 @Override
189 public List<NotificationEvent> fetchNotificationEvents(long userId)
190 throws ChannelException {
191
192 return fetchNotificationEvents(userId, false);
193 }
194
195 @Override
196 public List<NotificationEvent> fetchNotificationEvents(
197 long userId, boolean flush)
198 throws ChannelException {
199
200 Channel channel = fetchChannel(userId);
201
202 if (channel == null) {
203 return Collections.emptyList();
204 }
205
206 return channel.getNotificationEvents(flush);
207 }
208
209 @Override
210 public void flush() throws ChannelException {
211 for (Channel channel : _channels.values()) {
212 channel.flush();
213 }
214 }
215
216 @Override
217 public void flush(long userId) throws ChannelException {
218 Channel channel = fetchChannel(userId);
219
220 if (channel != null) {
221 channel.flush();
222 }
223 }
224
225 @Override
226 public void flush(long userId, long timestamp) throws ChannelException {
227 Channel channel = fetchChannel(userId);
228
229 if (channel != null) {
230 channel.flush(timestamp);
231 }
232 }
233
234 @Override
235 public Channel getChannel(long userId) throws ChannelException {
236 return getChannel(userId, false);
237 }
238
239 @Override
240 public Channel getChannel(long userId, boolean createIfAbsent)
241 throws ChannelException {
242
243 Channel channel = fetchChannel(userId, createIfAbsent);
244
245 if (channel == null) {
246 throw new UnknownChannelException(
247 "No channel exists with user id " + userId);
248 }
249
250 return channel;
251 }
252
253 public long getCompanyId() {
254 return _companyId;
255 }
256
257 @Override
258 public List<NotificationEvent> getNotificationEvents(long userId)
259 throws ChannelException {
260
261 return getNotificationEvents(userId, false);
262 }
263
264 @Override
265 public List<NotificationEvent> getNotificationEvents(
266 long userId, boolean flush)
267 throws ChannelException {
268
269 Channel channel = getChannel(userId, false);
270
271 return channel.getNotificationEvents(flush);
272 }
273
274 @Override
275 public Collection<Long> getUserIds() {
276 return Collections.unmodifiableSet(_channels.keySet());
277 }
278
279 @Override
280 public void registerChannelListener(
281 long userId, ChannelListener channelListener)
282 throws ChannelException {
283
284 Channel channel = getChannel(userId);
285
286 channel.registerChannelListener(channelListener);
287 }
288
289 @Override
290 public void removeTransientNotificationEvents(
291 long userId, Collection<NotificationEvent> notificationEvents)
292 throws ChannelException {
293
294 Channel channel = fetchChannel(userId);
295
296 if (channel != null) {
297 channel.removeTransientNotificationEvents(notificationEvents);
298 }
299 }
300
301 @Override
302 public void removeTransientNotificationEventsByUuid(
303 long userId, Collection<String> notificationEventUuids)
304 throws ChannelException {
305
306 Channel channel = fetchChannel(userId);
307
308 if (channel != null) {
309 channel.removeTransientNotificationEventsByUuid(
310 notificationEventUuids);
311 }
312 }
313
314 @Override
315 public void sendNotificationEvent(
316 long userId, NotificationEvent notificationEvent)
317 throws ChannelException {
318
319 Channel channel = fetchChannel(userId);
320
321 if (channel != null) {
322 channel.sendNotificationEvent(notificationEvent);
323
324 return;
325 }
326
327 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
328 !notificationEvent.isDeliveryRequired()) {
329
330 return;
331 }
332
333 try {
334 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
335 userId, notificationEvent);
336 }
337 catch (Exception e) {
338 throw new ChannelException("Unable to send event", e);
339 }
340 }
341
342 @Override
343 public void sendNotificationEvents(
344 long userId, Collection<NotificationEvent> notificationEvents)
345 throws ChannelException {
346
347 Channel channel = fetchChannel(userId);
348
349 if (channel != null) {
350 channel.sendNotificationEvents(notificationEvents);
351
352 return;
353 }
354
355 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
356 return;
357 }
358
359 List<NotificationEvent> persistedNotificationEvents = new ArrayList<>(
360 notificationEvents.size());
361
362 for (NotificationEvent notificationEvent : notificationEvents) {
363 if (notificationEvent.isDeliveryRequired()) {
364 persistedNotificationEvents.add(notificationEvent);
365 }
366 }
367
368 try {
369 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
370 userId, persistedNotificationEvents);
371 }
372 catch (Exception e) {
373 throw new ChannelException("Unable to send events", e);
374 }
375 }
376
377 @Override
378 public void storeNotificationEvent(
379 long userId, NotificationEvent notificationEvent)
380 throws ChannelException {
381
382 Channel channel = fetchChannel(userId);
383
384 if (channel != null) {
385 channel.storeNotificationEvent(
386 notificationEvent, System.currentTimeMillis());
387 }
388 }
389
390 @Override
391 public void unregisterChannelListener(
392 long userId, ChannelListener channelListener)
393 throws ChannelException {
394
395 Channel channel = getChannel(userId);
396
397 channel.unregisterChannelListener(channelListener);
398 }
399
400 private final ConcurrentMap<Long, Channel> _channels =
401 new ConcurrentHashMap<>();
402 private final long _companyId;
403
404 }