001package com.box.sdk;
002
003import com.box.sdk.http.HttpMethod;
004import com.eclipsesource.json.Json;
005import com.eclipsesource.json.JsonObject;
006import java.io.IOException;
007import java.io.InputStream;
008import java.net.URL;
009import java.security.DigestInputStream;
010import java.security.MessageDigest;
011import java.security.NoSuchAlgorithmException;
012import java.util.ArrayList;
013import java.util.List;
014import java.util.Map;
015import java.util.concurrent.Executors;
016import java.util.concurrent.ThreadPoolExecutor;
017import java.util.concurrent.TimeUnit;
018
019/** Utility class for uploading large files. */
020public final class LargeFileUpload {
021  private static final String DIGEST_ALGORITHM_SHA1 = "SHA1";
022  private static final int DEFAULT_CONNECTIONS = 3;
023  private static final int DEFAULT_TIMEOUT = 1;
024  private static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.HOURS;
025  private static final int THREAD_POOL_WAIT_TIME_IN_MILLIS = 1000;
026  private final ThreadPoolExecutor executorService;
027  private final long timeout;
028  private final TimeUnit timeUnit;
029
030  /**
031   * Creates a LargeFileUpload object.
032   *
033   * @param nParallelConnections number of parallel http connections to use
034   * @param timeOut time to wait before killing the job
035   * @param unit time unit for the time wait value
036   */
037  public LargeFileUpload(int nParallelConnections, long timeOut, TimeUnit unit) {
038    this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nParallelConnections);
039    this.timeout = timeOut;
040    this.timeUnit = unit;
041  }
042
043  /** Creates a LargeFileUpload object with a default number of parallel conections and timeout. */
044  public LargeFileUpload() {
045    this.executorService =
046        (ThreadPoolExecutor) Executors.newFixedThreadPool(LargeFileUpload.DEFAULT_CONNECTIONS);
047    this.timeout = LargeFileUpload.DEFAULT_TIMEOUT;
048    this.timeUnit = LargeFileUpload.DEFAULT_TIMEUNIT;
049  }
050
051  private static byte[] getBytesFromStream(InputStream stream, int numBytes) {
052
053    int bytesNeeded = numBytes;
054    int offset = 0;
055    byte[] bytes = new byte[numBytes];
056
057    while (bytesNeeded > 0) {
058
059      int bytesRead;
060      try {
061        bytesRead = stream.read(bytes, offset, bytesNeeded);
062      } catch (IOException ioe) {
063        throw new BoxAPIException("Reading data from stream failed.", ioe);
064      }
065
066      if (bytesRead == -1) {
067        throw new BoxAPIException("Stream ended while upload was progressing");
068      }
069
070      bytesNeeded = bytesNeeded - bytesRead;
071      offset = offset + bytesRead;
072    }
073
074    return bytes;
075  }
076
077  private BoxFileUploadSession.Info createUploadSession(
078      BoxAPIConnection boxApi, String folderId, URL url, String fileName, long fileSize) {
079
080    BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST);
081
082    // Create the JSON body of the request
083    JsonObject body = new JsonObject();
084    body.add("folder_id", folderId);
085    body.add("file_name", fileName);
086    body.add("file_size", fileSize);
087    request.setBody(body.toString());
088
089    try (BoxJSONResponse response = request.send()) {
090      JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
091
092      String sessionId = jsonObject.get("id").asString();
093      BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId);
094
095      return session.new Info(jsonObject);
096    }
097  }
098
099  /**
100   * Uploads a new large file.
101   *
102   * @param boxApi the API connection to be used by the upload session.
103   * @param folderId the id of the folder in which the file will be uploaded.
104   * @param stream the input stream that feeds the content of the file.
105   * @param url the upload session URL.
106   * @param fileName the name of the file to be created.
107   * @param fileSize the total size of the file.
108   * @return the created file instance.
109   * @throws InterruptedException when a thread gets interupted.
110   * @throws IOException when reading a stream throws exception.
111   */
112  public BoxFile.Info upload(
113      BoxAPIConnection boxApi,
114      String folderId,
115      InputStream stream,
116      URL url,
117      String fileName,
118      long fileSize)
119      throws InterruptedException, IOException {
120    // Create a upload session
121    BoxFileUploadSession.Info session =
122        this.createUploadSession(boxApi, folderId, url, fileName, fileSize);
123    return this.uploadHelper(session, stream, fileSize, null);
124  }
125
126  /**
127   * Uploads a new large file and sets file attributes.
128   *
129   * @param boxApi the API connection to be used by the upload session.
130   * @param folderId the id of the folder in which the file will be uploaded.
131   * @param stream the input stream that feeds the content of the file.
132   * @param url the upload session URL.
133   * @param fileName the name of the file to be created.
134   * @param fileSize the total size of the file.
135   * @param fileAttributes file attributes to set
136   * @return the created file instance.
137   * @throws InterruptedException when a thread gets interupted.
138   * @throws IOException when reading a stream throws exception.
139   */
140  public BoxFile.Info upload(
141      BoxAPIConnection boxApi,
142      String folderId,
143      InputStream stream,
144      URL url,
145      String fileName,
146      long fileSize,
147      Map<String, String> fileAttributes)
148      throws InterruptedException, IOException {
149    // Create a upload session
150    BoxFileUploadSession.Info session =
151        this.createUploadSession(boxApi, folderId, url, fileName, fileSize);
152    return this.uploadHelper(session, stream, fileSize, fileAttributes);
153  }
154
155  /**
156   * Creates a new version of a large file.
157   *
158   * @param boxApi the API connection to be used by the upload session.
159   * @param stream the input stream that feeds the content of the file.
160   * @param url the upload session URL.
161   * @param fileSize the total size of the file.
162   * @return the file instance that also contains the version information.
163   * @throws InterruptedException when a thread gets interupted.
164   * @throws IOException when reading a stream throws exception.
165   */
166  public BoxFile.Info upload(BoxAPIConnection boxApi, InputStream stream, URL url, long fileSize)
167      throws InterruptedException, IOException {
168    // creates a upload session
169    BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize);
170    return this.uploadHelper(session, stream, fileSize, null);
171  }
172
173  /**
174   * Creates a new version of a large file and sets file attributes.
175   *
176   * @param boxApi the API connection to be used by the upload session.
177   * @param stream the input stream that feeds the content of the file.
178   * @param url the upload session URL.
179   * @param fileSize the total size of the file.
180   * @param fileAttributes file attributes to set.
181   * @return the file instance that also contains the version information.
182   * @throws InterruptedException when a thread gets interupted.
183   * @throws IOException when reading a stream throws exception.
184   */
185  public BoxFile.Info upload(
186      BoxAPIConnection boxApi,
187      InputStream stream,
188      URL url,
189      long fileSize,
190      Map<String, String> fileAttributes)
191      throws InterruptedException, IOException {
192    // creates an upload session
193    BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize);
194    return this.uploadHelper(session, stream, fileSize, fileAttributes);
195  }
196
197  private BoxFile.Info uploadHelper(
198      BoxFileUploadSession.Info session,
199      InputStream stream,
200      long fileSize,
201      Map<String, String> fileAttributes)
202      throws InterruptedException {
203    // Upload parts using the upload session
204    MessageDigest digest;
205    try {
206      digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1);
207    } catch (NoSuchAlgorithmException ae) {
208      throw new BoxAPIException("Digest algorithm not found", ae);
209    }
210    DigestInputStream dis = new DigestInputStream(stream, digest);
211    List<BoxFileUploadSessionPart> parts = this.uploadParts(session, dis, fileSize);
212
213    // Creates the file hash
214    byte[] digestBytes = digest.digest();
215    String digestStr = Base64.encode(digestBytes);
216
217    // Commit the upload session. If there is a failure, abort the commit.
218    try {
219      return session.getResource().commit(digestStr, parts, fileAttributes, null, null);
220    } catch (Exception e) {
221      session.getResource().abort();
222      throw new BoxAPIException("Unable to commit the upload session", e);
223    }
224  }
225
226  private BoxFileUploadSession.Info createUploadSession(
227      BoxAPIConnection boxApi, URL url, long fileSize) {
228    BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST);
229
230    // Creates the body of the request
231    JsonObject body = new JsonObject();
232    body.add("file_size", fileSize);
233    request.setBody(body.toString());
234
235    try (BoxJSONResponse response = request.send()) {
236      JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
237
238      String sessionId = jsonObject.get("id").asString();
239      BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId);
240
241      return session.new Info(jsonObject);
242    }
243  }
244
245  /*
246   * Upload parts of the file. The part size is retrieved from the upload session.
247   */
248  private List<BoxFileUploadSessionPart> uploadParts(
249      BoxFileUploadSession.Info session, InputStream stream, long fileSize)
250      throws InterruptedException {
251    List<BoxFileUploadSessionPart> parts = new ArrayList<>();
252
253    int partSize = session.getPartSize();
254    long offset = 0;
255    long processed = 0;
256    int partPostion = 0;
257    // Set the Max Queue Size to 1.5x the number of processors
258    double maxQueueSizeDouble = Math.ceil(this.executorService.getMaximumPoolSize() * 1.5);
259    int maxQueueSize = Double.valueOf(maxQueueSizeDouble).intValue();
260    while (processed < fileSize) {
261      // Waiting for any thread to finish before
262      long timeoutForWaitingInMillis = TimeUnit.MILLISECONDS.convert(this.timeout, this.timeUnit);
263      if (this.executorService.getCorePoolSize() <= this.executorService.getActiveCount()) {
264        if (timeoutForWaitingInMillis > 0) {
265          Thread.sleep(LargeFileUpload.THREAD_POOL_WAIT_TIME_IN_MILLIS);
266          timeoutForWaitingInMillis -= THREAD_POOL_WAIT_TIME_IN_MILLIS;
267        } else {
268          throw new BoxAPIException("Upload parts timedout");
269        }
270      }
271      if (this.executorService.getQueue().size() < maxQueueSize) {
272        long diff = fileSize - processed;
273        // The size last part of the file can be lesser than the part size.
274        if (diff < (long) partSize) {
275          partSize = (int) diff;
276        }
277        parts.add(null);
278        byte[] bytes = getBytesFromStream(stream, partSize);
279        this.executorService.execute(
280            new LargeFileUploadTask(
281                session.getResource(), bytes, offset, partSize, fileSize, parts, partPostion));
282
283        // Increase the offset and proceesed bytes to calculate the Content-Range header.
284        processed += partSize;
285        offset += partSize;
286        partPostion++;
287      }
288    }
289    this.executorService.shutdown();
290    this.executorService.awaitTermination(this.timeout, this.timeUnit);
291    return parts;
292  }
293
294  /**
295   * Generates the Base64 encoded SHA-1 hash for content available in the stream. It can be used to
296   * calculate the hash of a file.
297   *
298   * @param stream the input stream of the file or data.
299   * @return the Base64 encoded hash string.
300   */
301  public String generateDigest(InputStream stream) {
302    MessageDigest digest;
303    try {
304      digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1);
305    } catch (NoSuchAlgorithmException ae) {
306      throw new BoxAPIException("Digest algorithm not found", ae);
307    }
308
309    // Calcuate the digest using the stream.
310    DigestInputStream dis = new DigestInputStream(stream, digest);
311    try {
312      int value = dis.read();
313      while (value != -1) {
314        value = dis.read();
315      }
316    } catch (IOException ioe) {
317      throw new BoxAPIException("Reading the stream failed.", ioe);
318    }
319
320    // Get the calculated digest for the stream
321    byte[] digestBytes = digest.digest();
322    return Base64.encode(digestBytes);
323  }
324}