001
014
015 package com.liferay.portal.fabric.netty.repository;
016
017 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018 import com.liferay.portal.fabric.netty.fileserver.FileRequest;
019 import com.liferay.portal.fabric.netty.fileserver.FileResponse;
020 import com.liferay.portal.fabric.netty.util.NettyUtil;
021 import com.liferay.portal.fabric.repository.Repository;
022 import com.liferay.portal.fabric.repository.RepositoryHelperUtil;
023 import com.liferay.portal.kernel.concurrent.AsyncBroker;
024 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
025 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
026 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
027 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030
031 import io.netty.channel.Channel;
032 import io.netty.channel.ChannelFuture;
033 import io.netty.channel.ChannelFutureListener;
034
035 import java.io.IOException;
036
037 import java.nio.file.Files;
038 import java.nio.file.Path;
039 import java.nio.file.StandardCopyOption;
040 import java.nio.file.attribute.FileTime;
041
042 import java.util.Iterator;
043 import java.util.Map;
044 import java.util.Set;
045 import java.util.concurrent.ConcurrentHashMap;
046 import java.util.concurrent.Future;
047 import java.util.concurrent.atomic.AtomicInteger;
048
049
052 public class NettyRepository implements Repository<Channel> {
053
054 public NettyRepository(Path repositoryPath, long getFileTimeout) {
055 if (repositoryPath == null) {
056 throw new NullPointerException("Repository path is null");
057 }
058
059 if (!Files.isDirectory(repositoryPath)) {
060 throw new IllegalArgumentException(
061 repositoryPath + " is not a directory");
062 }
063
064 this.repositoryPath = repositoryPath;
065 this.getFileTimeout = getFileTimeout;
066 }
067
068 @Override
069 public void dispose(boolean delete) {
070 Set<Map.Entry<Path, Path>> entrySet = pathMap.entrySet();
071
072 Iterator<Map.Entry<Path, Path>> iterator = entrySet.iterator();
073
074 while (iterator.hasNext()) {
075 Map.Entry<Path, Path> entry = iterator.next();
076
077 iterator.remove();
078
079 FileHelperUtil.delete(true, entry.getValue());
080 }
081
082 if (delete) {
083 FileHelperUtil.delete(true, repositoryPath);
084 }
085 }
086
087 @Override
088 public AsyncBroker<Path, FileResponse> getAsyncBroker() {
089 return asyncBroker;
090 }
091
092 @Override
093 public NoticeableFuture<Path> getFile(
094 Channel channel, Path remoteFilePath, Path localFilePath,
095 boolean deleteAfterFetch) {
096
097 if (localFilePath == null) {
098 return getFile(
099 channel, remoteFilePath,
100 RepositoryHelperUtil.getRepositoryFilePath(
101 repositoryPath, remoteFilePath),
102 deleteAfterFetch, true);
103 }
104
105 return getFile(
106 channel, remoteFilePath, localFilePath, deleteAfterFetch, false);
107 }
108
109 @Override
110 public NoticeableFuture<Map<Path, Path>> getFiles(
111 Channel channel, Map<Path, Path> pathMap, boolean deleteAfterFetch) {
112
113 final DefaultNoticeableFuture<Map<Path, Path>> defaultNoticeableFuture =
114 new DefaultNoticeableFuture<>();
115
116 if (pathMap.isEmpty()) {
117 defaultNoticeableFuture.set(pathMap);
118
119 return defaultNoticeableFuture;
120 }
121
122 final Map<Path, Path> resultPathMap = new ConcurrentHashMap<>();
123
124 final AtomicInteger counter = new AtomicInteger(pathMap.size());
125
126 for (Map.Entry<Path, Path> entry : pathMap.entrySet()) {
127 final Path remoteFilePath = entry.getKey();
128
129 NoticeableFuture<Path> noticeableFuture = getFile(
130 channel, remoteFilePath, entry.getValue(), deleteAfterFetch);
131
132 noticeableFuture.addFutureListener(
133 new BaseFutureListener<Path>() {
134
135 @Override
136 public void completeWithCancel(Future<Path> future) {
137 defaultNoticeableFuture.cancel(true);
138 }
139
140 @Override
141 public void completeWithException(
142 Future<Path> future, Throwable throwable) {
143
144 defaultNoticeableFuture.setException(throwable);
145 }
146
147 @Override
148 public void completeWithResult(
149 Future<Path> future, Path localFilePath) {
150
151 if (localFilePath != null) {
152 resultPathMap.put(remoteFilePath, localFilePath);
153 }
154
155 if (counter.decrementAndGet() <= 0) {
156 defaultNoticeableFuture.set(resultPathMap);
157 }
158 }
159
160 });
161 }
162
163 return defaultNoticeableFuture;
164 }
165
166 @Override
167 public Path getRepositoryPath() {
168 return repositoryPath;
169 }
170
171 protected static long getLastModifiedTime(Path path) {
172 if (path == null) {
173 return Long.MIN_VALUE;
174 }
175
176 try {
177 FileTime fileTime = Files.getLastModifiedTime(path);
178
179 return fileTime.toMillis();
180 }
181 catch (IOException ioe) {
182 return Long.MIN_VALUE;
183 }
184 }
185
186 protected NoticeableFuture<Path> getFile(
187 Channel channel, final Path remoteFilePath, final Path localFilePath,
188 boolean deleteAfterFetch, final boolean populateCache) {
189
190 if (_log.isDebugEnabled()) {
191 _log.debug("Fetching remote file " + remoteFilePath);
192 }
193
194 final Path cachedLocalFilePath = pathMap.get(remoteFilePath);
195
196 final DefaultNoticeableFuture<FileResponse> defaultNoticeableFuture =
197 new DefaultNoticeableFuture<>();
198
199 NoticeableFuture<FileResponse> noticeableFuture = asyncBroker.post(
200 remoteFilePath, defaultNoticeableFuture);
201
202 if (noticeableFuture == null) {
203 noticeableFuture = defaultNoticeableFuture;
204
205 NettyUtil.scheduleCancellation(
206 channel, defaultNoticeableFuture, getFileTimeout);
207
208 ChannelFuture channelFuture = channel.writeAndFlush(
209 new FileRequest(
210 remoteFilePath, getLastModifiedTime(cachedLocalFilePath),
211 deleteAfterFetch));
212
213 channelFuture.addListener(
214 new ChannelFutureListener() {
215
216 @Override
217 public void operationComplete(ChannelFuture channelFuture) {
218 if (channelFuture.isSuccess()) {
219 return;
220 }
221
222 if (channelFuture.isCancelled()) {
223 defaultNoticeableFuture.cancel(true);
224
225 return;
226 }
227
228 Throwable throwable = new IOException(
229 "Unable to fetch remote file " + remoteFilePath,
230 channelFuture.cause());
231
232 if (!asyncBroker.takeWithException(
233 remoteFilePath, throwable)) {
234
235 _log.error(
236 "Unable to place exception because no future " +
237 "exists with ID " + remoteFilePath,
238 throwable);
239 }
240 }
241
242 });
243 }
244
245 return new NoticeableFutureConverter<Path, FileResponse>(
246 noticeableFuture) {
247
248 @Override
249 protected Path convert(FileResponse fileResponse)
250 throws IOException {
251
252 if (fileResponse.isFileNotFound()) {
253 if (_log.isWarnEnabled()) {
254 _log.warn(
255 "Remote file " + remoteFilePath + " is not found");
256 }
257
258 return null;
259 }
260
261 if (fileResponse.isFileNotModified()) {
262 if (_log.isDebugEnabled()) {
263 _log.debug(
264 "Remote file " + remoteFilePath +
265 " is not modified, use cached local file " +
266 cachedLocalFilePath);
267 }
268
269 return cachedLocalFilePath;
270 }
271
272 Path targetLocalFilePath = localFilePath;
273
274 synchronized (fileResponse) {
275 Path recheckCacheLocalFilePath = pathMap.get(
276 remoteFilePath);
277
278 if (recheckCacheLocalFilePath != null) {
279 targetLocalFilePath = recheckCacheLocalFilePath;
280 }
281
282 Path tempLocalFilePath = fileResponse.getLocalFile();
283
284 if (tempLocalFilePath.startsWith(repositoryPath)) {
285 Files.copy(
286 fileResponse.getLocalFile(), targetLocalFilePath,
287 StandardCopyOption.REPLACE_EXISTING);
288 }
289 else {
290 Files.move(
291 fileResponse.getLocalFile(), targetLocalFilePath,
292 StandardCopyOption.REPLACE_EXISTING);
293 }
294
295 if (populateCache) {
296 pathMap.put(remoteFilePath, targetLocalFilePath);
297 }
298
299 fileResponse.setLocalFile(targetLocalFilePath);
300 }
301
302 if (_log.isDebugEnabled()) {
303 _log.debug(
304 "Fetched remote file " + remoteFilePath + " to " +
305 targetLocalFilePath);
306 }
307
308 return targetLocalFilePath;
309 }
310
311 };
312 }
313
314 protected final AsyncBroker<Path, FileResponse> asyncBroker =
315 new AsyncBroker<>();
316 protected final long getFileTimeout;
317 protected final Map<Path, Path> pathMap = new ConcurrentHashMap<>();
318 protected final Path repositoryPath;
319
320 private static final Log _log = LogFactoryUtil.getLog(
321 NettyRepository.class);
322
323 }