001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.notifications.Channel;
018 import com.liferay.portal.kernel.notifications.ChannelException;
019 import com.liferay.portal.kernel.notifications.ChannelHub;
020 import com.liferay.portal.kernel.notifications.ChannelListener;
021 import com.liferay.portal.kernel.notifications.NotificationEvent;
022 import com.liferay.portal.kernel.notifications.UnknownChannelException;
023 import com.liferay.portal.model.CompanyConstants;
024 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
025 import com.liferay.portal.util.PropsValues;
026
027 import java.util.ArrayList;
028 import java.util.Collection;
029 import java.util.Collections;
030 import java.util.Iterator;
031 import java.util.List;
032 import java.util.Map;
033 import java.util.Set;
034 import java.util.concurrent.ConcurrentHashMap;
035 import java.util.concurrent.ConcurrentMap;
036
037
042 public class ChannelHubImpl implements ChannelHub {
043
044 public void cleanUp() throws ChannelException {
045 for (Channel channel : _channels.values()) {
046 channel.cleanUp();
047 }
048 }
049
050 public void cleanUp(long userId) throws ChannelException {
051 Channel channel = getChannel(userId);
052
053 channel.cleanUp();
054 }
055
056 public ChannelHub clone(long companyId) {
057 ChannelHubImpl channelHubImpl = new ChannelHubImpl();
058
059 channelHubImpl.setChannelPrototype(_channel);
060 channelHubImpl.setCompanyId(companyId);
061
062 return channelHubImpl;
063 }
064
065 public void confirmDelivery(
066 long userId, Collection<String> notificationEventUuids)
067 throws ChannelException {
068
069 confirmDelivery(userId, notificationEventUuids, false);
070 }
071
072 public void confirmDelivery(
073 long userId, Collection<String> notificationEventUuids,
074 boolean archive)
075 throws ChannelException {
076
077 Channel channel = getChannel(userId);
078
079 channel.confirmDelivery(notificationEventUuids, archive);
080 }
081
082 public void confirmDelivery(long userId, String notificationEventUuid)
083 throws ChannelException {
084
085 confirmDelivery(userId, notificationEventUuid, false);
086 }
087
088 public void confirmDelivery(
089 long userId, String notificationEventUuid, boolean archive)
090 throws ChannelException {
091
092 Channel channel = getChannel(userId);
093
094 channel.confirmDelivery(notificationEventUuid, archive);
095 }
096
097 public Channel createChannel(long userId) throws ChannelException {
098 if (_channels.containsKey(userId)) {
099 return _channels.get(userId);
100 }
101
102 Channel channel = _channel.clone(_companyId, userId);
103
104 Channel oldChannel = _channels.putIfAbsent(userId, channel);
105
106 if (oldChannel != null) {
107 channel.sendNotificationEvents(oldChannel.getNotificationEvents());
108 }
109
110 channel.init();
111
112 return channel;
113 }
114
115 public void deleteUserNotificiationEvent(
116 long userId, String notificationEventUuid)
117 throws ChannelException {
118
119 Channel channel = getChannel(userId);
120
121 channel.deleteUserNotificiationEvent(notificationEventUuid);
122 }
123
124 public void deleteUserNotificiationEvents(
125 long userId, Collection<String> notificationEventUuids)
126 throws ChannelException {
127
128 Channel channel = getChannel(userId);
129
130 channel.deleteUserNotificiationEvents(notificationEventUuids);
131 }
132
133 public void destroy() throws ChannelException {
134 Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
135
136 Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
137
138 while (itr.hasNext()) {
139 Channel channel = itr.next().getValue();
140
141 channel.close();
142
143 itr.remove();
144 }
145 }
146
147 public Channel destroyChannel(long userId) throws ChannelException {
148 Channel channel = _channels.remove(userId);
149
150 if (channel != null) {
151 channel.close();
152 }
153
154 return channel;
155 }
156
157 public Channel fetchChannel(long userId) throws ChannelException {
158 return fetchChannel(userId, false);
159 }
160
161 public Channel fetchChannel(long userId, boolean createIfAbsent)
162 throws ChannelException {
163
164 Channel channel = _channels.get(userId);
165
166 if (channel == null) {
167 synchronized (_channels) {
168 channel = _channels.get(userId);
169
170 if (channel == null) {
171 if (createIfAbsent) {
172 channel = createChannel(userId);
173 }
174 }
175 }
176 }
177
178 return channel;
179 }
180
181 public List<NotificationEvent> fetchNotificationEvents(long userId)
182 throws ChannelException {
183
184 return fetchNotificationEvents(userId, false);
185 }
186
187 public List<NotificationEvent> fetchNotificationEvents(
188 long userId, boolean flush)
189 throws ChannelException {
190
191 Channel channel = fetchChannel(userId);
192
193 if (channel == null) {
194 return Collections.emptyList();
195 }
196
197 return channel.getNotificationEvents(flush);
198 }
199
200 public void flush() throws ChannelException {
201 for (Channel channel : _channels.values()) {
202 channel.flush();
203 }
204 }
205
206 public void flush(long userId) throws ChannelException {
207 Channel channel = fetchChannel(userId);
208
209 if (channel != null) {
210 channel.flush();
211 }
212 }
213
214 public void flush(long userId, long timestamp) throws ChannelException {
215 Channel channel = fetchChannel(userId);
216
217 if (channel != null) {
218 channel.flush(timestamp);
219 }
220 }
221
222 public Channel getChannel(long userId) throws ChannelException {
223 return getChannel(userId, false);
224 }
225
226 public Channel getChannel(long userId, boolean createIfAbsent)
227 throws ChannelException {
228
229 Channel channel = fetchChannel(userId, createIfAbsent);
230
231 if (channel == null) {
232 throw new UnknownChannelException(
233 "No channel exists with user id " + userId);
234 }
235
236 return channel;
237 }
238
239 public long getCompanyId() {
240 return _companyId;
241 }
242
243 public List<NotificationEvent> getNotificationEvents(long userId)
244 throws ChannelException {
245
246 return getNotificationEvents(userId, false);
247 }
248
249 public List<NotificationEvent> getNotificationEvents(
250 long userId, boolean flush)
251 throws ChannelException {
252
253 Channel channel = getChannel(userId, false);
254
255 return channel.getNotificationEvents(flush);
256 }
257
258 public Collection<Long> getUserIds() {
259 return Collections.unmodifiableSet(_channels.keySet());
260 }
261
262 public void registerChannelListener(
263 long userId, ChannelListener channelListener)
264 throws ChannelException {
265
266 Channel channel = getChannel(userId);
267
268 channel.registerChannelListener(channelListener);
269 }
270
271 public void removeTransientNotificationEvents(
272 long userId, Collection<NotificationEvent> notificationEvents)
273 throws ChannelException {
274
275 Channel channel = fetchChannel(userId);
276
277 if (channel != null) {
278 channel.removeTransientNotificationEvents(notificationEvents);
279 }
280 }
281
282 public void removeTransientNotificationEventsByUuid(
283 long userId, Collection<String> notificationEventUuids)
284 throws ChannelException {
285
286 Channel channel = fetchChannel(userId);
287
288 if (channel != null) {
289 channel.removeTransientNotificationEventsByUuid(
290 notificationEventUuids);
291 }
292 }
293
294 public void sendNotificationEvent(
295 long userId, NotificationEvent notificationEvent)
296 throws ChannelException {
297
298 Channel channel = fetchChannel(userId);
299
300 if (channel != null) {
301 channel.sendNotificationEvent(notificationEvent);
302
303 return;
304 }
305
306 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
307 !notificationEvent.isDeliveryRequired()) {
308
309 return;
310 }
311
312 try {
313 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
314 userId, notificationEvent);
315 }
316 catch (Exception e) {
317 throw new ChannelException("Unable to send event", e);
318 }
319 }
320
321 public void sendNotificationEvents(
322 long userId, Collection<NotificationEvent> notificationEvents)
323 throws ChannelException {
324
325 Channel channel = fetchChannel(userId);
326
327 if (channel != null) {
328 channel.sendNotificationEvents(notificationEvents);
329
330 return;
331 }
332
333 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
334 return;
335 }
336
337 List<NotificationEvent> persistedNotificationEvents =
338 new ArrayList<NotificationEvent>(notificationEvents.size());
339
340 for (NotificationEvent notificationEvent : notificationEvents) {
341 if (notificationEvent.isDeliveryRequired()) {
342 persistedNotificationEvents.add(notificationEvent);
343 }
344 }
345
346 try {
347 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
348 userId, persistedNotificationEvents);
349 }
350 catch (Exception e) {
351 throw new ChannelException("Unable to send events", e);
352 }
353 }
354
355 public void setChannelPrototype(Channel channel) {
356 _channel = channel;
357 }
358
359 public void setCompanyId(long companyId) {
360 _companyId = companyId;
361 }
362
363 public void unregisterChannelListener(
364 long userId, ChannelListener channelListener)
365 throws ChannelException {
366
367 Channel channel = getChannel(userId);
368
369 channel.unregisterChannelListener(channelListener);
370 }
371
372 private Channel _channel;
373 private ConcurrentMap<Long, Channel> _channels =
374 new ConcurrentHashMap<Long, Channel>();
375 private long _companyId = CompanyConstants.SYSTEM;
376
377 }