001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.repository;
016    
017    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018    import com.liferay.portal.fabric.netty.fileserver.FileRequest;
019    import com.liferay.portal.fabric.netty.fileserver.FileResponse;
020    import com.liferay.portal.fabric.netty.util.NettyUtil;
021    import com.liferay.portal.fabric.repository.Repository;
022    import com.liferay.portal.fabric.repository.RepositoryHelperUtil;
023    import com.liferay.portal.kernel.concurrent.AsyncBroker;
024    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
025    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
026    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
027    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    
031    import io.netty.channel.Channel;
032    import io.netty.channel.ChannelFuture;
033    import io.netty.channel.ChannelFutureListener;
034    
035    import java.io.IOException;
036    
037    import java.nio.file.Files;
038    import java.nio.file.Path;
039    import java.nio.file.StandardCopyOption;
040    import java.nio.file.attribute.FileTime;
041    
042    import java.util.Map;
043    import java.util.concurrent.ConcurrentHashMap;
044    import java.util.concurrent.Future;
045    import java.util.concurrent.atomic.AtomicInteger;
046    
047    /**
048     * @author Shuyang Zhou
049     */
050    public class NettyRepository implements Repository<Channel> {
051    
052            public NettyRepository(Path repositoryPath, long getFileTimeout) {
053                    if (repositoryPath == null) {
054                            throw new NullPointerException("Repository path is null");
055                    }
056    
057                    if (!Files.isDirectory(repositoryPath)) {
058                            throw new IllegalArgumentException(
059                                    repositoryPath + " is not a directory");
060                    }
061    
062                    this.repositoryPath = repositoryPath;
063                    this.getFileTimeout = getFileTimeout;
064            }
065    
066            @Override
067            public void dispose(boolean delete) {
068                    for (Path path : pathMap.values()) {
069                            FileHelperUtil.delete(true, path);
070                    }
071    
072                    pathMap.clear();
073    
074                    if (delete) {
075                            FileHelperUtil.delete(true, repositoryPath);
076                    }
077            }
078    
079            @Override
080            public AsyncBroker<Path, FileResponse> getAsyncBroker() {
081                    return asyncBroker;
082            }
083    
084            @Override
085            public NoticeableFuture<Path> getFile(
086                    Channel channel, Path remoteFilePath, Path localFilePath,
087                    boolean deleteAfterFetch) {
088    
089                    if (localFilePath == null) {
090                            return getFile(
091                                    channel, remoteFilePath,
092                                    RepositoryHelperUtil.getRepositoryFilePath(
093                                            repositoryPath, remoteFilePath),
094                                    deleteAfterFetch, true);
095                    }
096    
097                    return getFile(
098                            channel, remoteFilePath, localFilePath, deleteAfterFetch, false);
099            }
100    
101            @Override
102            public NoticeableFuture<Map<Path, Path>> getFiles(
103                    Channel channel, Map<Path, Path> pathMap, boolean deleteAfterFetch) {
104    
105                    final DefaultNoticeableFuture<Map<Path, Path>> defaultNoticeableFuture =
106                            new DefaultNoticeableFuture<Map<Path, Path>>();
107    
108                    if (pathMap.isEmpty()) {
109                            defaultNoticeableFuture.set(pathMap);
110    
111                            return defaultNoticeableFuture;
112                    }
113    
114                    final Map<Path, Path> resultPathMap =
115                            new ConcurrentHashMap<Path, Path>();
116    
117                    final AtomicInteger counter = new AtomicInteger(pathMap.size());
118    
119                    for (Map.Entry<Path, Path> entry : pathMap.entrySet()) {
120                            final Path remoteFilePath = entry.getKey();
121    
122                            NoticeableFuture<Path> noticeableFuture = getFile(
123                                    channel, remoteFilePath, entry.getValue(), deleteAfterFetch);
124    
125                            noticeableFuture.addFutureListener(
126                                    new BaseFutureListener<Path>() {
127    
128                                            @Override
129                                            public void completeWithCancel(Future<Path> future) {
130                                                    defaultNoticeableFuture.cancel(true);
131                                            }
132    
133                                            @Override
134                                            public void completeWithException(
135                                                    Future<Path> future, Throwable throwable) {
136    
137                                                    defaultNoticeableFuture.setException(throwable);
138                                            }
139    
140                                            @Override
141                                            public void completeWithResult(
142                                                    Future<Path> future, Path localFilePath) {
143    
144                                                    if (localFilePath != null) {
145                                                            resultPathMap.put(remoteFilePath, localFilePath);
146                                                    }
147    
148                                                    if (counter.decrementAndGet() <= 0) {
149                                                            defaultNoticeableFuture.set(resultPathMap);
150                                                    }
151                                            }
152    
153                                    });
154                    }
155    
156                    return defaultNoticeableFuture;
157            }
158    
159            @Override
160            public Path getRepositoryPath() {
161                    return repositoryPath;
162            }
163    
164            protected static long getLastModifiedTime(Path path) {
165                    if (path == null) {
166                            return Long.MIN_VALUE;
167                    }
168    
169                    try {
170                            FileTime fileTime = Files.getLastModifiedTime(path);
171    
172                            return fileTime.toMillis();
173                    }
174                    catch (IOException ioe) {
175                            return Long.MIN_VALUE;
176                    }
177            }
178    
179            protected NoticeableFuture<Path> getFile(
180                    Channel channel, final Path remoteFilePath, final Path localFilePath,
181                    boolean deleteAfterFetch, final boolean populateCache) {
182    
183                    if (_log.isDebugEnabled()) {
184                            _log.debug("Fetching remote file " + remoteFilePath);
185                    }
186    
187                    final Path cachedLocalFilePath = pathMap.get(remoteFilePath);
188    
189                    final DefaultNoticeableFuture<FileResponse> defaultNoticeableFuture =
190                            new DefaultNoticeableFuture<FileResponse>();
191    
192                    NoticeableFuture<FileResponse> noticeableFuture = asyncBroker.post(
193                            remoteFilePath, defaultNoticeableFuture);
194    
195                    if (noticeableFuture == null) {
196                            noticeableFuture = defaultNoticeableFuture;
197    
198                            NettyUtil.scheduleCancellation(
199                                    channel, defaultNoticeableFuture, getFileTimeout);
200    
201                            ChannelFuture channelFuture = channel.writeAndFlush(
202                                    new FileRequest(
203                                            remoteFilePath, getLastModifiedTime(cachedLocalFilePath),
204                                            deleteAfterFetch));
205    
206                            channelFuture.addListener(
207                                    new ChannelFutureListener() {
208    
209                                            @Override
210                                            public void operationComplete(ChannelFuture channelFuture) {
211                                                    if (channelFuture.isSuccess()) {
212                                                            return;
213                                                    }
214    
215                                                    if (channelFuture.isCancelled()) {
216                                                            defaultNoticeableFuture.cancel(true);
217    
218                                                            return;
219                                                    }
220    
221                                                    Throwable throwable = new IOException(
222                                                            "Unable to fetch remote file " + remoteFilePath,
223                                                            channelFuture.cause());
224    
225                                                    if (!asyncBroker.takeWithException(
226                                                                    remoteFilePath, throwable)) {
227    
228                                                            _log.error(
229                                                                    "Unable to place exception because no future " +
230                                                                            "exists with ID " + remoteFilePath,
231                                                                    throwable);
232                                                    }
233                                            }
234    
235                                    });
236                    }
237    
238                    return new NoticeableFutureConverter<Path, FileResponse>(
239                            noticeableFuture) {
240    
241                                    @Override
242                                    protected Path convert(FileResponse fileResponse)
243                                            throws IOException {
244    
245                                            if (fileResponse.isFileNotFound()) {
246                                                    if (_log.isWarnEnabled()) {
247                                                            _log.warn(
248                                                                    "Remote file " + remoteFilePath +
249                                                                            " is not found");
250                                                    }
251    
252                                                    return null;
253                                            }
254    
255                                            if (fileResponse.isFileNotModified()) {
256                                                    if (_log.isDebugEnabled()) {
257                                                            _log.debug(
258                                                                    "Remote file " + remoteFilePath +
259                                                                            " is not modified, use cached local file " +
260                                                                                    cachedLocalFilePath);
261                                                    }
262    
263                                                    return cachedLocalFilePath;
264                                            }
265    
266                                            Path targetLocalFilePath = localFilePath;
267    
268                                            synchronized (fileResponse) {
269                                                    Path recheckCacheLocalFilePath = pathMap.get(
270                                                            remoteFilePath);
271    
272                                                    if (recheckCacheLocalFilePath != null) {
273                                                            targetLocalFilePath = recheckCacheLocalFilePath;
274                                                    }
275    
276                                                    Path tempLocalFilePath = fileResponse.getLocalFile();
277    
278                                                    if (tempLocalFilePath.startsWith(repositoryPath)) {
279                                                            Files.copy(
280                                                                    fileResponse.getLocalFile(),
281                                                                    targetLocalFilePath,
282                                                                    StandardCopyOption.REPLACE_EXISTING);
283                                                    }
284                                                    else {
285                                                            Files.move(
286                                                                    fileResponse.getLocalFile(),
287                                                                    targetLocalFilePath,
288                                                                    StandardCopyOption.REPLACE_EXISTING);
289                                                    }
290    
291                                                    if (populateCache) {
292                                                            pathMap.put(remoteFilePath, targetLocalFilePath);
293                                                    }
294    
295                                                    fileResponse.setLocalFile(targetLocalFilePath);
296                                            }
297    
298                                            if (_log.isDebugEnabled()) {
299                                                    _log.debug(
300                                                            "Fetched remote file " + remoteFilePath + " to " +
301                                                                    targetLocalFilePath);
302                                            }
303    
304                                            return targetLocalFilePath;
305                                    }
306    
307                            };
308            }
309    
310            protected final AsyncBroker<Path, FileResponse> asyncBroker =
311                    new AsyncBroker<Path, FileResponse>();
312            protected final long getFileTimeout;
313            protected final Map<Path, Path> pathMap =
314                    new ConcurrentHashMap<Path, Path>();
315            protected final Path repositoryPath;
316    
317            private static final Log _log = LogFactoryUtil.getLog(
318                    NettyRepository.class);
319    
320    }