001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterRequest;
019 import com.liferay.portal.kernel.notifications.Channel;
020 import com.liferay.portal.kernel.notifications.ChannelException;
021 import com.liferay.portal.kernel.notifications.ChannelHub;
022 import com.liferay.portal.kernel.notifications.ChannelHubManager;
023 import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
024 import com.liferay.portal.kernel.notifications.ChannelListener;
025 import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
028 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031
032 import java.util.Collection;
033 import java.util.Collections;
034 import java.util.List;
035 import java.util.concurrent.ConcurrentHashMap;
036 import java.util.concurrent.ConcurrentMap;
037
038
043 @DoPrivileged
044 public class ChannelHubManagerImpl implements ChannelHubManager {
045
046 @Override
047 public void confirmDelivery(
048 long companyId, long userId,
049 Collection<String> notificationEventUuids)
050 throws ChannelException {
051
052 confirmDelivery(companyId, userId, notificationEventUuids, false);
053 }
054
055 @Override
056 public void confirmDelivery(
057 long companyId, long userId,
058 Collection<String> notificationEventUuids, boolean archive)
059 throws ChannelException {
060
061 ChannelHub channelHub = getChannelHub(companyId);
062
063 channelHub.confirmDelivery(userId, notificationEventUuids, archive);
064 }
065
066 @Override
067 public void confirmDelivery(
068 long companyId, long userId, String notificationEventUuid)
069 throws ChannelException {
070
071 confirmDelivery(companyId, userId, notificationEventUuid, false);
072 }
073
074 @Override
075 public void confirmDelivery(
076 long companyId, long userId, String notificationEventUuid,
077 boolean archive)
078 throws ChannelException {
079
080 ChannelHub channelHub = getChannelHub(companyId);
081
082 channelHub.confirmDelivery(userId, notificationEventUuid, archive);
083 }
084
085 @Override
086 public Channel createChannel(long companyId, long userId)
087 throws ChannelException {
088
089 ChannelHub channelHub = getChannelHub(companyId);
090
091 return channelHub.createChannel(userId);
092 }
093
094 @Override
095 public ChannelHub createChannelHub(long companyId) throws ChannelException {
096 ChannelHub channelHub = _channelHub.clone(companyId);
097
098 if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
099 throw new DuplicateChannelHubException(
100 "Channel already exists with company id " + companyId);
101 }
102
103 return channelHub;
104 }
105
106 @Override
107 public void deleteUserNotificiationEvent(
108 long companyId, long userId, String notificationEventUuid)
109 throws ChannelException {
110
111 ChannelHub channelHub = getChannelHub(companyId);
112
113 channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
114 }
115
116 @Override
117 public void deleteUserNotificiationEvents(
118 long companyId, long userId,
119 Collection<String> notificationEventUuids)
120 throws ChannelException {
121
122 ChannelHub channelHub = getChannelHub(companyId);
123
124 channelHub.deleteUserNotificiationEvents(
125 userId, notificationEventUuids);
126 }
127
128 @Override
129 public void destroyChannel(long companyId, long userId)
130 throws ChannelException {
131
132 ChannelHub channelHub = getChannelHub(companyId);
133
134 channelHub.destroyChannel(userId);
135 }
136
137 @Override
138 public void destroyChannelHub(long companyId) throws ChannelException {
139 ChannelHub channelHub = _channelHubs.remove(companyId);
140
141 if (channelHub != null) {
142 channelHub.destroy();
143 }
144 }
145
146 @Override
147 public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
148 return fetchChannelHub(companyId, false);
149 }
150
151 @Override
152 public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
153 throws ChannelException {
154
155 ChannelHub channelHub = _channelHubs.get(companyId);
156
157 if (channelHub == null) {
158 synchronized(_channelHubs) {
159 channelHub = _channelHubs.get(companyId);
160
161 if (channelHub == null) {
162 if (createIfAbsent) {
163 channelHub = createChannelHub(companyId);
164 }
165 }
166 }
167 }
168
169 return channelHub;
170 }
171
172 @Override
173 public List<NotificationEvent> fetchNotificationEvents(
174 long companyId, long userId, boolean flush)
175 throws ChannelException {
176
177 ChannelHub channelHub = fetchChannelHub(companyId);
178
179 if (channelHub == null) {
180 return Collections.emptyList();
181 }
182
183 return channelHub.fetchNotificationEvents(userId, flush);
184 }
185
186 @Override
187 public void flush() throws ChannelException {
188 for (ChannelHub channelHub : _channelHubs.values()) {
189 channelHub.flush();
190 }
191 }
192
193 @Override
194 public void flush(long companyId) throws ChannelException {
195 ChannelHub channelHub = fetchChannelHub(companyId);
196
197 if (channelHub != null) {
198 channelHub.flush();
199 }
200 }
201
202 @Override
203 public void flush(long companyId, long userId, long timestamp)
204 throws ChannelException {
205
206 ChannelHub channelHub = fetchChannelHub(companyId);
207
208 if (channelHub != null) {
209 channelHub.flush(userId, timestamp);
210 }
211 }
212
213 @Override
214 public Channel getChannel(long companyId, long userId)
215 throws ChannelException {
216
217 return getChannel(companyId, userId, false);
218 }
219
220 @Override
221 public Channel getChannel(
222 long companyId, long userId, boolean createIfAbsent)
223 throws ChannelException {
224
225 ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
226
227 return channelHub.getChannel(userId, createIfAbsent);
228 }
229
230 @Override
231 public ChannelHub getChannelHub(long companyId) throws ChannelException {
232 return getChannelHub(companyId, false);
233 }
234
235 @Override
236 public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
237 throws ChannelException {
238
239 ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
240
241 if (channelHub == null) {
242 throw new UnknownChannelHubException(
243 "No channel exists with company id " + companyId);
244 }
245
246 return channelHub;
247 }
248
249 @Override
250 public List<NotificationEvent> getNotificationEvents(
251 long companyId, long userId)
252 throws ChannelException {
253
254 ChannelHub channelHub = getChannelHub(companyId);
255
256 return channelHub.getNotificationEvents(userId);
257 }
258
259 @Override
260 public List<NotificationEvent> getNotificationEvents(
261 long companyId, long userId, boolean flush)
262 throws ChannelException {
263
264 ChannelHub channelHub = getChannelHub(companyId);
265
266 return channelHub.getNotificationEvents(userId, flush);
267 }
268
269 @Override
270 public Collection<Long> getUserIds(long companyId) throws ChannelException {
271 ChannelHub channelHub = getChannelHub(companyId);
272
273 return channelHub.getUserIds();
274 }
275
276 @Override
277 public void registerChannelListener(
278 long companyId, long userId, ChannelListener channelListener)
279 throws ChannelException {
280
281 ChannelHub channelHub = getChannelHub(companyId);
282
283 channelHub.registerChannelListener(userId, channelListener);
284 }
285
286 @Override
287 public void removeTransientNotificationEvents(
288 long companyId, long userId,
289 Collection<NotificationEvent> notificationEvents)
290 throws ChannelException {
291
292 ChannelHub channelHub = getChannelHub(companyId);
293
294 channelHub.removeTransientNotificationEvents(
295 userId, notificationEvents);
296 }
297
298 @Override
299 public void removeTransientNotificationEventsByUuid(
300 long companyId, long userId,
301 Collection<String> notificationEventUuids)
302 throws ChannelException {
303
304 ChannelHub channelHub = getChannelHub(companyId);
305
306 channelHub.removeTransientNotificationEventsByUuid(
307 userId, notificationEventUuids);
308 }
309
310 @Override
311 public void sendClusterNotificationEvent(
312 long companyId, long userId, NotificationEvent notificationEvent)
313 throws ChannelException {
314
315 sendNotificationEvent(companyId, userId, notificationEvent);
316
317 MethodHandler methodHandler = new MethodHandler(
318 _storeNotificationEventMethodKey, companyId, userId,
319 notificationEvent);
320
321 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
322 methodHandler, true);
323
324 try {
325 ClusterExecutorUtil.execute(clusterRequest);
326 }
327 catch (Exception e) {
328 throw new ChannelException("Unable to notify cluster of event", e);
329 }
330 }
331
332 @Override
333 public void sendNotificationEvent(
334 long companyId, long userId, NotificationEvent notificationEvent)
335 throws ChannelException {
336
337 ChannelHub channelHub = getChannelHub(companyId);
338
339 channelHub.sendNotificationEvent(userId, notificationEvent);
340 }
341
342 @Override
343 public void sendNotificationEvents(
344 long companyId, long userId,
345 Collection<NotificationEvent> notificationEvents)
346 throws ChannelException {
347
348 ChannelHub channelHub = getChannelHub(companyId);
349
350 channelHub.sendNotificationEvents(userId, notificationEvents);
351 }
352
353 public void setChannelHubPrototype(ChannelHub channelHub) {
354 _channelHub = channelHub;
355 }
356
357 @Override
358 public void storeNotificationEvent(
359 long companyId, long userId, NotificationEvent notificationEvent)
360 throws ChannelException {
361
362 ChannelHub channelHub = getChannelHub(companyId);
363
364 channelHub.storeNotificationEvent(userId, notificationEvent);
365 }
366
367 @Override
368 public void unregisterChannelListener(
369 long companyId, long userId, ChannelListener channelListener)
370 throws ChannelException {
371
372 ChannelHub channelHub = getChannelHub(companyId);
373
374 channelHub.unregisterChannelListener(userId, channelListener);
375 }
376
377 private static final MethodKey _storeNotificationEventMethodKey =
378 new MethodKey(
379 ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
380 long.class, NotificationEvent.class);
381
382 private ChannelHub _channelHub;
383 private ConcurrentMap<Long, ChannelHub> _channelHubs =
384 new ConcurrentHashMap<Long, ChannelHub>();
385
386 }