001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.fileserver.handlers;
016    
017    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
018    import com.liferay.portal.fabric.netty.fileserver.FileResponse;
019    import com.liferay.portal.kernel.concurrent.AsyncBroker;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    
023    import io.netty.buffer.ByteBuf;
024    import io.netty.channel.ChannelHandlerContext;
025    import io.netty.channel.ChannelInboundHandlerAdapter;
026    import io.netty.channel.ChannelPipeline;
027    import io.netty.util.concurrent.EventExecutor;
028    
029    import java.io.IOException;
030    
031    import java.nio.channels.FileChannel;
032    import java.nio.file.Files;
033    import java.nio.file.Path;
034    import java.nio.file.StandardOpenOption;
035    import java.nio.file.attribute.FileTime;
036    
037    import java.util.concurrent.Callable;
038    
039    /**
040     * @author Shuyang Zhou
041     */
042    public class FileUploadChannelHandler extends ChannelInboundHandlerAdapter {
043    
044            public FileUploadChannelHandler(
045                            AsyncBroker<Path, FileResponse> asyncBroker,
046                            FileResponse fileResponse, EventExecutor eventExecutor)
047                    throws IOException {
048    
049                    if (asyncBroker == null) {
050                            throw new NullPointerException("Async broker is null");
051                    }
052    
053                    if (fileResponse == null) {
054                            throw new NullPointerException("File response is null");
055                    }
056    
057                    if (eventExecutor == null) {
058                            throw new NullPointerException("Event executor is null");
059                    }
060    
061                    if (fileResponse.getSize() < 1) {
062                            throw new IllegalArgumentException(
063                                    "File response has no content for uploading");
064                    }
065    
066                    this.asyncBroker = asyncBroker;
067                    this.fileResponse = fileResponse;
068                    this.eventExecutor = eventExecutor;
069    
070                    tempFilePath = Files.createTempFile(
071                            FileUploadChannelHandler.class.getName() + "-", null);
072    
073                    fileChannel = FileChannel.open(
074                            tempFilePath, StandardOpenOption.WRITE,
075                            StandardOpenOption.TRUNCATE_EXISTING);
076    
077                    fileResponse.setLocalFile(tempFilePath);
078            }
079    
080            @Override
081            public void channelRead(
082                            ChannelHandlerContext channelHandlerContext, Object object)
083                    throws IOException {
084    
085                    ByteBuf byteBuf = (ByteBuf)object;
086    
087                    if (!receive(byteBuf)) {
088                            return;
089                    }
090    
091                    fileChannel.close();
092    
093                    if (eventExecutor.inEventLoop() || !fileResponse.isFolder()) {
094                            finish();
095                    }
096                    else {
097                            eventExecutor.submit(
098                                    new Callable<Void>() {
099    
100                                            @Override
101                                            public Void call() throws IOException {
102                                                    try {
103                                                            finish();
104                                                    }
105                                                    catch (IOException ioe) {
106                                                            exceptionCaught(null, ioe);
107                                                    }
108    
109                                                    return null;
110                                            }
111    
112                                    });
113                    }
114    
115                    ChannelPipeline channelPipeline = channelHandlerContext.pipeline();
116    
117                    channelPipeline.remove(this);
118    
119                    if (byteBuf.isReadable()) {
120                            channelHandlerContext.fireChannelRead(object);
121                    }
122            }
123    
124            @Override
125            public void exceptionCaught(
126                            ChannelHandlerContext channelHandlerContext, Throwable throwable)
127                    throws IOException {
128    
129                    _log.error("File upload failure", throwable);
130    
131                    if (channelHandlerContext != null) {
132                            ChannelPipeline channelPipeline = channelHandlerContext.pipeline();
133    
134                            channelPipeline.remove(this);
135                    }
136    
137                    if (!asyncBroker.takeWithException(fileResponse.getPath(), throwable)) {
138                            _log.error(
139                                    "Unable to place exception because no future exists with ID " +
140                                            fileResponse.getPath(),
141                                    throwable);
142                    }
143    
144                    fileChannel.close();
145    
146                    Files.delete(tempFilePath);
147            }
148    
149            protected void finish() throws IOException {
150                    if (fileResponse.isFolder()) {
151                            fileResponse.setLocalFile(
152                                    FileHelperUtil.unzip(
153                                            tempFilePath, FileHelperUtil.TEMP_DIR_PATH));
154    
155                            Files.delete(tempFilePath);
156                    }
157    
158                    Files.setLastModifiedTime(
159                            fileResponse.getLocalFile(),
160                            FileTime.fromMillis(fileResponse.getLastModifiedTime()));
161    
162                    Path path = fileResponse.getPath();
163    
164                    if (!asyncBroker.takeWithResult(path, fileResponse)) {
165                            _log.error(
166                                    "Unable to place result " + fileResponse +
167                                            " because no future exists with ID " + path);
168                    }
169            }
170    
171            protected boolean receive(ByteBuf byteBuf) throws IOException {
172                    while (true) {
173                            long readSize = fileResponse.getSize() - fileChannel.position();
174    
175                            int readableBytes = byteBuf.readableBytes();
176    
177                            if (readSize > readableBytes) {
178                                    readSize = readableBytes;
179                            }
180    
181                            if (byteBuf.readBytes(fileChannel, (int)readSize) == readSize) {
182                                    break;
183                            }
184                    }
185    
186                    if (!byteBuf.isReadable()) {
187                            byteBuf.release();
188                    }
189    
190                    if (fileChannel.position() < fileResponse.getSize()) {
191                            return false;
192                    }
193    
194                    return true;
195            }
196    
197            protected final AsyncBroker<Path, FileResponse> asyncBroker;
198            protected final EventExecutor eventExecutor;
199            protected final FileChannel fileChannel;
200            protected final FileResponse fileResponse;
201            protected final Path tempFilePath;
202    
203            private static final Log _log = LogFactoryUtil.getLog(
204                    FileUploadChannelHandler.class);
205    
206    }