001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019 import com.liferay.portal.kernel.cluster.ClusterRequest;
020 import com.liferay.portal.kernel.notifications.Channel;
021 import com.liferay.portal.kernel.notifications.ChannelException;
022 import com.liferay.portal.kernel.notifications.ChannelHub;
023 import com.liferay.portal.kernel.notifications.ChannelHubManager;
024 import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
025 import com.liferay.portal.kernel.notifications.ChannelListener;
026 import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
027 import com.liferay.portal.kernel.notifications.NotificationEvent;
028 import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
029 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
030 import com.liferay.portal.kernel.util.MethodHandler;
031 import com.liferay.portal.kernel.util.MethodKey;
032
033 import java.util.Collection;
034 import java.util.Collections;
035 import java.util.List;
036 import java.util.concurrent.ConcurrentHashMap;
037 import java.util.concurrent.ConcurrentMap;
038
039
044 @DoPrivileged
045 public class ChannelHubManagerImpl implements ChannelHubManager {
046
047 @Override
048 public void confirmDelivery(
049 long companyId, long userId,
050 Collection<String> notificationEventUuids)
051 throws ChannelException {
052
053 confirmDelivery(companyId, userId, notificationEventUuids, false);
054 }
055
056 @Override
057 public void confirmDelivery(
058 long companyId, long userId,
059 Collection<String> notificationEventUuids, boolean archive)
060 throws ChannelException {
061
062 ChannelHub channelHub = getChannelHub(companyId);
063
064 channelHub.confirmDelivery(userId, notificationEventUuids, archive);
065 }
066
067 @Override
068 public void confirmDelivery(
069 long companyId, long userId, String notificationEventUuid)
070 throws ChannelException {
071
072 confirmDelivery(companyId, userId, notificationEventUuid, false);
073 }
074
075 @Override
076 public void confirmDelivery(
077 long companyId, long userId, String notificationEventUuid,
078 boolean archive)
079 throws ChannelException {
080
081 ChannelHub channelHub = getChannelHub(companyId);
082
083 channelHub.confirmDelivery(userId, notificationEventUuid, archive);
084 }
085
086 @Override
087 public Channel createChannel(long companyId, long userId)
088 throws ChannelException {
089
090 ChannelHub channelHub = getChannelHub(companyId);
091
092 return channelHub.createChannel(userId);
093 }
094
095 @Override
096 public ChannelHub createChannelHub(long companyId) throws ChannelException {
097 ChannelHub channelHub = _channelHub.clone(companyId);
098
099 if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
100 throw new DuplicateChannelHubException(
101 "Channel already exists with company id " + companyId);
102 }
103
104 return channelHub;
105 }
106
107 @Override
108 public void deleteUserNotificiationEvent(
109 long companyId, long userId, String notificationEventUuid)
110 throws ChannelException {
111
112 ChannelHub channelHub = getChannelHub(companyId);
113
114 channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
115 }
116
117 @Override
118 public void deleteUserNotificiationEvents(
119 long companyId, long userId,
120 Collection<String> notificationEventUuids)
121 throws ChannelException {
122
123 ChannelHub channelHub = getChannelHub(companyId);
124
125 channelHub.deleteUserNotificiationEvents(
126 userId, notificationEventUuids);
127 }
128
129 @Override
130 public void destroyChannel(long companyId, long userId)
131 throws ChannelException {
132
133 ChannelHub channelHub = getChannelHub(companyId);
134
135 channelHub.destroyChannel(userId);
136 }
137
138 @Override
139 public void destroyChannelHub(long companyId) throws ChannelException {
140 ChannelHub channelHub = _channelHubs.remove(companyId);
141
142 if (channelHub != null) {
143 channelHub.destroy();
144 }
145 }
146
147 @Override
148 public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
149 return fetchChannelHub(companyId, false);
150 }
151
152 @Override
153 public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
154 throws ChannelException {
155
156 ChannelHub channelHub = _channelHubs.get(companyId);
157
158 if (channelHub == null) {
159 synchronized(_channelHubs) {
160 channelHub = _channelHubs.get(companyId);
161
162 if (channelHub == null) {
163 if (createIfAbsent) {
164 channelHub = createChannelHub(companyId);
165 }
166 }
167 }
168 }
169
170 return channelHub;
171 }
172
173 @Override
174 public List<NotificationEvent> fetchNotificationEvents(
175 long companyId, long userId, boolean flush)
176 throws ChannelException {
177
178 ChannelHub channelHub = fetchChannelHub(companyId);
179
180 if (channelHub == null) {
181 return Collections.emptyList();
182 }
183
184 return channelHub.fetchNotificationEvents(userId, flush);
185 }
186
187 @Override
188 public void flush() throws ChannelException {
189 for (ChannelHub channelHub : _channelHubs.values()) {
190 channelHub.flush();
191 }
192 }
193
194 @Override
195 public void flush(long companyId) throws ChannelException {
196 ChannelHub channelHub = fetchChannelHub(companyId);
197
198 if (channelHub != null) {
199 channelHub.flush();
200 }
201 }
202
203 @Override
204 public void flush(long companyId, long userId, long timestamp)
205 throws ChannelException {
206
207 ChannelHub channelHub = fetchChannelHub(companyId);
208
209 if (channelHub != null) {
210 channelHub.flush(userId, timestamp);
211 }
212 }
213
214 @Override
215 public Channel getChannel(long companyId, long userId)
216 throws ChannelException {
217
218 return getChannel(companyId, userId, false);
219 }
220
221 @Override
222 public Channel getChannel(
223 long companyId, long userId, boolean createIfAbsent)
224 throws ChannelException {
225
226 ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
227
228 return channelHub.getChannel(userId, createIfAbsent);
229 }
230
231 @Override
232 public ChannelHub getChannelHub(long companyId) throws ChannelException {
233 return getChannelHub(companyId, false);
234 }
235
236 @Override
237 public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
238 throws ChannelException {
239
240 ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
241
242 if (channelHub == null) {
243 throw new UnknownChannelHubException(
244 "No channel exists with company id " + companyId);
245 }
246
247 return channelHub;
248 }
249
250 @Override
251 public List<NotificationEvent> getNotificationEvents(
252 long companyId, long userId)
253 throws ChannelException {
254
255 ChannelHub channelHub = getChannelHub(companyId);
256
257 return channelHub.getNotificationEvents(userId);
258 }
259
260 @Override
261 public List<NotificationEvent> getNotificationEvents(
262 long companyId, long userId, boolean flush)
263 throws ChannelException {
264
265 ChannelHub channelHub = getChannelHub(companyId);
266
267 return channelHub.getNotificationEvents(userId, flush);
268 }
269
270 @Override
271 public Collection<Long> getUserIds(long companyId) throws ChannelException {
272 ChannelHub channelHub = getChannelHub(companyId);
273
274 return channelHub.getUserIds();
275 }
276
277 @Override
278 public void registerChannelListener(
279 long companyId, long userId, ChannelListener channelListener)
280 throws ChannelException {
281
282 ChannelHub channelHub = getChannelHub(companyId);
283
284 channelHub.registerChannelListener(userId, channelListener);
285 }
286
287 @Override
288 public void removeTransientNotificationEvents(
289 long companyId, long userId,
290 Collection<NotificationEvent> notificationEvents)
291 throws ChannelException {
292
293 ChannelHub channelHub = getChannelHub(companyId);
294
295 channelHub.removeTransientNotificationEvents(
296 userId, notificationEvents);
297 }
298
299 @Override
300 public void removeTransientNotificationEventsByUuid(
301 long companyId, long userId,
302 Collection<String> notificationEventUuids)
303 throws ChannelException {
304
305 ChannelHub channelHub = getChannelHub(companyId);
306
307 channelHub.removeTransientNotificationEventsByUuid(
308 userId, notificationEventUuids);
309 }
310
311 @Override
312 public void sendNotificationEvent(
313 long companyId, long userId, NotificationEvent notificationEvent)
314 throws ChannelException {
315
316 ChannelHub channelHub = getChannelHub(companyId);
317
318 channelHub.sendNotificationEvent(userId, notificationEvent);
319
320 if (!ClusterInvokeThreadLocal.isEnabled()) {
321 return;
322 }
323
324 MethodHandler methodHandler = new MethodHandler(
325 _storeNotificationEventMethodKey, companyId, userId,
326 notificationEvent);
327
328 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
329 methodHandler, true);
330
331 try {
332 ClusterExecutorUtil.execute(clusterRequest);
333 }
334 catch (Exception e) {
335 throw new ChannelException("Unable to notify cluster of event", e);
336 }
337 }
338
339 @Override
340 public void sendNotificationEvents(
341 long companyId, long userId,
342 Collection<NotificationEvent> notificationEvents)
343 throws ChannelException {
344
345 ChannelHub channelHub = getChannelHub(companyId);
346
347 channelHub.sendNotificationEvents(userId, notificationEvents);
348 }
349
350 public void setChannelHubPrototype(ChannelHub channelHub) {
351 _channelHub = channelHub;
352 }
353
354 @Override
355 public void storeNotificationEvent(
356 long companyId, long userId, NotificationEvent notificationEvent)
357 throws ChannelException {
358
359 ChannelHub channelHub = getChannelHub(companyId);
360
361 channelHub.storeNotificationEvent(userId, notificationEvent);
362 }
363
364 @Override
365 public void unregisterChannelListener(
366 long companyId, long userId, ChannelListener channelListener)
367 throws ChannelException {
368
369 ChannelHub channelHub = getChannelHub(companyId);
370
371 channelHub.unregisterChannelListener(userId, channelListener);
372 }
373
374 private static final MethodKey _storeNotificationEventMethodKey =
375 new MethodKey(
376 ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
377 long.class, NotificationEvent.class);
378
379 private ChannelHub _channelHub;
380 private ConcurrentMap<Long, ChannelHub> _channelHubs =
381 new ConcurrentHashMap<Long, ChannelHub>();
382
383 }