001
014
015 package com.liferay.portal.kernel.monitoring.messaging;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.BaseMessageListener;
020 import com.liferay.portal.kernel.messaging.Message;
021 import com.liferay.portal.kernel.monitoring.DataSample;
022 import com.liferay.portal.kernel.monitoring.DataSampleProcessor;
023 import com.liferay.portal.kernel.monitoring.Level;
024 import com.liferay.portal.kernel.monitoring.MonitoringControl;
025 import com.liferay.portal.kernel.monitoring.MonitoringException;
026 import com.liferay.portal.kernel.util.ListUtil;
027 import com.liferay.portal.kernel.util.Validator;
028 import com.liferay.registry.Registry;
029 import com.liferay.registry.RegistryUtil;
030 import com.liferay.registry.ServiceReference;
031 import com.liferay.registry.ServiceTracker;
032 import com.liferay.registry.ServiceTrackerCustomizer;
033
034 import java.util.ArrayList;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.Set;
038 import java.util.concurrent.ConcurrentHashMap;
039
040
044 public class MonitoringMessageListener
045 extends BaseMessageListener implements MonitoringControl {
046
047 public void afterPropertiesSet() {
048 Registry registry = RegistryUtil.getRegistry();
049
050 _serviceTracker = registry.trackServices(
051 DataSampleProcessor.class,
052 new DataSampleProcessorServiceTrackerCustomizer());
053
054 _serviceTracker.open();
055 }
056
057 public void destroy() {
058 _serviceTracker.close();
059
060 _serviceTracker = null;
061 }
062
063 @Override
064 public Level getLevel(String namespace) {
065 Level level = _levels.get(namespace);
066
067 if (level == null) {
068 return Level.OFF;
069 }
070
071 return level;
072 }
073
074 @Override
075 public Set<String> getNamespaces() {
076 return _levels.keySet();
077 }
078
079 public void processDataSample(DataSample dataSample)
080 throws MonitoringException {
081
082 String namespace = dataSample.getNamespace();
083
084 Level level = _levels.get(namespace);
085
086 if ((level != null) && level.equals(Level.OFF)) {
087 return;
088 }
089
090 List<DataSampleProcessor<DataSample>> dataSampleProcessors =
091 _dataSampleProcessors.get(namespace);
092
093 if ((dataSampleProcessors == null) || dataSampleProcessors.isEmpty()) {
094 return;
095 }
096
097 for (DataSampleProcessor<DataSample> dataSampleProcessor :
098 dataSampleProcessors) {
099
100 dataSampleProcessor.processDataSample(dataSample);
101 }
102 }
103
104 public synchronized void registerDataSampleProcessor(
105 String namespace, DataSampleProcessor<DataSample> dataSampleProcessor) {
106
107 List<DataSampleProcessor<DataSample>> dataSampleProcessors =
108 _dataSampleProcessors.get(namespace);
109
110 if (dataSampleProcessors == null) {
111 dataSampleProcessors = new ArrayList<>();
112
113 _dataSampleProcessors.put(namespace, dataSampleProcessors);
114 }
115
116 dataSampleProcessors.add(dataSampleProcessor);
117 }
118
119 @Override
120 public void setLevel(String namespace, Level level) {
121 _levels.put(namespace, level);
122 }
123
124 public void setLevels(Map<String, String> levels) {
125 for (Map.Entry<String, String> entry : levels.entrySet()) {
126 String namespace = entry.getKey();
127 String levelName = entry.getValue();
128
129 Level level = Level.valueOf(levelName);
130
131 _levels.put(namespace, level);
132 }
133 }
134
135 public synchronized void unregisterDataSampleProcessor(
136 String namespace, DataSampleProcessor<DataSample> dataSampleProcessor) {
137
138 List<DataSampleProcessor<DataSample>> dataSampleProcessors =
139 _dataSampleProcessors.get(namespace);
140
141 if (dataSampleProcessors != null) {
142 dataSampleProcessors.remove(dataSampleProcessor);
143 }
144 }
145
146 @Override
147 @SuppressWarnings("unchecked")
148 protected void doReceive(Message message) throws Exception {
149 List<DataSample> dataSamples = (List<DataSample>)message.getPayload();
150
151 if (ListUtil.isNotEmpty(dataSamples)) {
152 for (DataSample dataSample : dataSamples) {
153 processDataSample(dataSample);
154 }
155 }
156 }
157
158 private static final Log _log = LogFactoryUtil.getLog(
159 MonitoringMessageListener.class);
160
161 private final Map<String, List<DataSampleProcessor<DataSample>>>
162 _dataSampleProcessors = new ConcurrentHashMap<>();
163 private final Map<String, Level> _levels = new ConcurrentHashMap<>();
164
165 @SuppressWarnings("rawtypes")
166 private ServiceTracker<DataSampleProcessor, DataSampleProcessor>
167 _serviceTracker;
168
169 @SuppressWarnings("rawtypes")
170 private class DataSampleProcessorServiceTrackerCustomizer
171 implements ServiceTrackerCustomizer
172 <DataSampleProcessor, DataSampleProcessor> {
173
174 @Override
175 public DataSampleProcessor addingService(
176 ServiceReference<DataSampleProcessor> serviceReference) {
177
178 Registry registry = RegistryUtil.getRegistry();
179
180 String namespace = (String)serviceReference.getProperty(
181 "namespace");
182
183 DataSampleProcessor<DataSample> dataSampleProcessor =
184 registry.getService(serviceReference);
185
186 if (Validator.isNotNull(namespace)) {
187 registerDataSampleProcessor(namespace, dataSampleProcessor);
188 }
189 else {
190 if (_log.isWarnEnabled()) {
191 _log.warn(
192 "No namespace defined for service " +
193 dataSampleProcessor.getClass());
194 }
195 }
196
197 return dataSampleProcessor;
198 }
199
200 @Override
201 public void modifiedService(
202 ServiceReference<DataSampleProcessor> serviceReference,
203 DataSampleProcessor dataSampleProcessor) {
204 }
205
206 @Override
207 public void removedService(
208 ServiceReference<DataSampleProcessor> serviceReference,
209 DataSampleProcessor dataSampleProcessor) {
210
211 String namespace = (String)serviceReference.getProperty(
212 "namespace");
213
214 if (Validator.isNull(namespace)) {
215 if (_log.isWarnEnabled()) {
216 _log.warn(
217 "No namespace defined for service " +
218 dataSampleProcessor.getClass());
219 }
220
221 return;
222 }
223
224 Registry registry = RegistryUtil.getRegistry();
225
226 dataSampleProcessor = registry.getService(serviceReference);
227
228 unregisterDataSampleProcessor(namespace, dataSampleProcessor);
229 }
230
231 }
232
233 }