001package com.box.sdk; 002 003import com.box.sdk.http.HttpMethod; 004import com.eclipsesource.json.Json; 005import com.eclipsesource.json.JsonObject; 006import java.io.IOException; 007import java.io.InputStream; 008import java.net.URL; 009import java.security.DigestInputStream; 010import java.security.MessageDigest; 011import java.security.NoSuchAlgorithmException; 012import java.util.ArrayList; 013import java.util.List; 014import java.util.Map; 015import java.util.concurrent.Executors; 016import java.util.concurrent.ThreadPoolExecutor; 017import java.util.concurrent.TimeUnit; 018 019/** Utility class for uploading large files. */ 020public final class LargeFileUpload { 021 private static final String DIGEST_ALGORITHM_SHA1 = "SHA1"; 022 private static final int DEFAULT_CONNECTIONS = 3; 023 private static final int DEFAULT_TIMEOUT = 1; 024 private static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.HOURS; 025 private static final int THREAD_POOL_WAIT_TIME_IN_MILLIS = 1000; 026 private final ThreadPoolExecutor executorService; 027 private final long timeout; 028 private final TimeUnit timeUnit; 029 030 /** 031 * Creates a LargeFileUpload object. 032 * 033 * @param nParallelConnections number of parallel http connections to use 034 * @param timeOut time to wait before killing the job 035 * @param unit time unit for the time wait value 036 */ 037 public LargeFileUpload(int nParallelConnections, long timeOut, TimeUnit unit) { 038 this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nParallelConnections); 039 this.timeout = timeOut; 040 this.timeUnit = unit; 041 } 042 043 /** Creates a LargeFileUpload object with a default number of parallel conections and timeout. */ 044 public LargeFileUpload() { 045 this.executorService = 046 (ThreadPoolExecutor) Executors.newFixedThreadPool(LargeFileUpload.DEFAULT_CONNECTIONS); 047 this.timeout = LargeFileUpload.DEFAULT_TIMEOUT; 048 this.timeUnit = LargeFileUpload.DEFAULT_TIMEUNIT; 049 } 050 051 private static byte[] getBytesFromStream(InputStream stream, int numBytes) { 052 053 int bytesNeeded = numBytes; 054 int offset = 0; 055 byte[] bytes = new byte[numBytes]; 056 057 while (bytesNeeded > 0) { 058 059 int bytesRead; 060 try { 061 bytesRead = stream.read(bytes, offset, bytesNeeded); 062 } catch (IOException ioe) { 063 throw new BoxAPIException("Reading data from stream failed.", ioe); 064 } 065 066 if (bytesRead == -1) { 067 throw new BoxAPIException("Stream ended while upload was progressing"); 068 } 069 070 bytesNeeded = bytesNeeded - bytesRead; 071 offset = offset + bytesRead; 072 } 073 074 return bytes; 075 } 076 077 private BoxFileUploadSession.Info createUploadSession( 078 BoxAPIConnection boxApi, String folderId, URL url, String fileName, long fileSize) { 079 080 BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST); 081 082 // Create the JSON body of the request 083 JsonObject body = new JsonObject(); 084 body.add("folder_id", folderId); 085 body.add("file_name", fileName); 086 body.add("file_size", fileSize); 087 request.setBody(body.toString()); 088 089 try (BoxJSONResponse response = request.send()) { 090 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 091 092 String sessionId = jsonObject.get("id").asString(); 093 BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId); 094 095 return session.new Info(jsonObject); 096 } 097 } 098 099 /** 100 * Uploads a new large file. 101 * 102 * @param boxApi the API connection to be used by the upload session. 103 * @param folderId the id of the folder in which the file will be uploaded. 104 * @param stream the input stream that feeds the content of the file. 105 * @param url the upload session URL. 106 * @param fileName the name of the file to be created. 107 * @param fileSize the total size of the file. 108 * @return the created file instance. 109 * @throws InterruptedException when a thread gets interupted. 110 * @throws IOException when reading a stream throws exception. 111 */ 112 public BoxFile.Info upload( 113 BoxAPIConnection boxApi, 114 String folderId, 115 InputStream stream, 116 URL url, 117 String fileName, 118 long fileSize) 119 throws InterruptedException, IOException { 120 // Create a upload session 121 BoxFileUploadSession.Info session = 122 this.createUploadSession(boxApi, folderId, url, fileName, fileSize); 123 return this.uploadHelper(session, stream, fileSize, null); 124 } 125 126 /** 127 * Uploads a new large file and sets file attributes. 128 * 129 * @param boxApi the API connection to be used by the upload session. 130 * @param folderId the id of the folder in which the file will be uploaded. 131 * @param stream the input stream that feeds the content of the file. 132 * @param url the upload session URL. 133 * @param fileName the name of the file to be created. 134 * @param fileSize the total size of the file. 135 * @param fileAttributes file attributes to set 136 * @return the created file instance. 137 * @throws InterruptedException when a thread gets interupted. 138 * @throws IOException when reading a stream throws exception. 139 */ 140 public BoxFile.Info upload( 141 BoxAPIConnection boxApi, 142 String folderId, 143 InputStream stream, 144 URL url, 145 String fileName, 146 long fileSize, 147 Map<String, String> fileAttributes) 148 throws InterruptedException, IOException { 149 // Create a upload session 150 BoxFileUploadSession.Info session = 151 this.createUploadSession(boxApi, folderId, url, fileName, fileSize); 152 return this.uploadHelper(session, stream, fileSize, fileAttributes); 153 } 154 155 /** 156 * Creates a new version of a large file. 157 * 158 * @param boxApi the API connection to be used by the upload session. 159 * @param stream the input stream that feeds the content of the file. 160 * @param url the upload session URL. 161 * @param fileSize the total size of the file. 162 * @return the file instance that also contains the version information. 163 * @throws InterruptedException when a thread gets interupted. 164 * @throws IOException when reading a stream throws exception. 165 */ 166 public BoxFile.Info upload(BoxAPIConnection boxApi, InputStream stream, URL url, long fileSize) 167 throws InterruptedException, IOException { 168 // creates a upload session 169 BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize); 170 return this.uploadHelper(session, stream, fileSize, null); 171 } 172 173 /** 174 * Creates a new version of a large file and sets file attributes. 175 * 176 * @param boxApi the API connection to be used by the upload session. 177 * @param stream the input stream that feeds the content of the file. 178 * @param url the upload session URL. 179 * @param fileSize the total size of the file. 180 * @param fileAttributes file attributes to set. 181 * @return the file instance that also contains the version information. 182 * @throws InterruptedException when a thread gets interupted. 183 * @throws IOException when reading a stream throws exception. 184 */ 185 public BoxFile.Info upload( 186 BoxAPIConnection boxApi, 187 InputStream stream, 188 URL url, 189 long fileSize, 190 Map<String, String> fileAttributes) 191 throws InterruptedException, IOException { 192 // creates an upload session 193 BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize); 194 return this.uploadHelper(session, stream, fileSize, fileAttributes); 195 } 196 197 private BoxFile.Info uploadHelper( 198 BoxFileUploadSession.Info session, 199 InputStream stream, 200 long fileSize, 201 Map<String, String> fileAttributes) 202 throws InterruptedException { 203 // Upload parts using the upload session 204 MessageDigest digest; 205 try { 206 digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1); 207 } catch (NoSuchAlgorithmException ae) { 208 throw new BoxAPIException("Digest algorithm not found", ae); 209 } 210 DigestInputStream dis = new DigestInputStream(stream, digest); 211 List<BoxFileUploadSessionPart> parts = this.uploadParts(session, dis, fileSize); 212 213 // Creates the file hash 214 byte[] digestBytes = digest.digest(); 215 String digestStr = Base64.encode(digestBytes); 216 217 // Commit the upload session. If there is a failure, abort the commit. 218 try { 219 return session.getResource().commit(digestStr, parts, fileAttributes, null, null); 220 } catch (Exception e) { 221 session.getResource().abort(); 222 throw new BoxAPIException("Unable to commit the upload session", e); 223 } 224 } 225 226 private BoxFileUploadSession.Info createUploadSession( 227 BoxAPIConnection boxApi, URL url, long fileSize) { 228 BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST); 229 230 // Creates the body of the request 231 JsonObject body = new JsonObject(); 232 body.add("file_size", fileSize); 233 request.setBody(body.toString()); 234 235 try (BoxJSONResponse response = request.send()) { 236 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 237 238 String sessionId = jsonObject.get("id").asString(); 239 BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId); 240 241 return session.new Info(jsonObject); 242 } 243 } 244 245 /* 246 * Upload parts of the file. The part size is retrieved from the upload session. 247 */ 248 private List<BoxFileUploadSessionPart> uploadParts( 249 BoxFileUploadSession.Info session, InputStream stream, long fileSize) 250 throws InterruptedException { 251 List<BoxFileUploadSessionPart> parts = new ArrayList<>(); 252 253 int partSize = session.getPartSize(); 254 long offset = 0; 255 long processed = 0; 256 int partPostion = 0; 257 // Set the Max Queue Size to 1.5x the number of processors 258 double maxQueueSizeDouble = Math.ceil(this.executorService.getMaximumPoolSize() * 1.5); 259 int maxQueueSize = Double.valueOf(maxQueueSizeDouble).intValue(); 260 while (processed < fileSize) { 261 // Waiting for any thread to finish before 262 long timeoutForWaitingInMillis = TimeUnit.MILLISECONDS.convert(this.timeout, this.timeUnit); 263 if (this.executorService.getCorePoolSize() <= this.executorService.getActiveCount()) { 264 if (timeoutForWaitingInMillis > 0) { 265 Thread.sleep(LargeFileUpload.THREAD_POOL_WAIT_TIME_IN_MILLIS); 266 timeoutForWaitingInMillis -= THREAD_POOL_WAIT_TIME_IN_MILLIS; 267 } else { 268 throw new BoxAPIException("Upload parts timedout"); 269 } 270 } 271 if (this.executorService.getQueue().size() < maxQueueSize) { 272 long diff = fileSize - processed; 273 // The size last part of the file can be lesser than the part size. 274 if (diff < (long) partSize) { 275 partSize = (int) diff; 276 } 277 parts.add(null); 278 byte[] bytes = getBytesFromStream(stream, partSize); 279 this.executorService.execute( 280 new LargeFileUploadTask( 281 session.getResource(), bytes, offset, partSize, fileSize, parts, partPostion)); 282 283 // Increase the offset and proceesed bytes to calculate the Content-Range header. 284 processed += partSize; 285 offset += partSize; 286 partPostion++; 287 } 288 } 289 this.executorService.shutdown(); 290 this.executorService.awaitTermination(this.timeout, this.timeUnit); 291 return parts; 292 } 293 294 /** 295 * Generates the Base64 encoded SHA-1 hash for content available in the stream. It can be used to 296 * calculate the hash of a file. 297 * 298 * @param stream the input stream of the file or data. 299 * @return the Base64 encoded hash string. 300 */ 301 public String generateDigest(InputStream stream) { 302 MessageDigest digest; 303 try { 304 digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1); 305 } catch (NoSuchAlgorithmException ae) { 306 throw new BoxAPIException("Digest algorithm not found", ae); 307 } 308 309 // Calcuate the digest using the stream. 310 DigestInputStream dis = new DigestInputStream(stream, digest); 311 try { 312 int value = dis.read(); 313 while (value != -1) { 314 value = dis.read(); 315 } 316 } catch (IOException ioe) { 317 throw new BoxAPIException("Reading the stream failed.", ioe); 318 } 319 320 // Get the calculated digest for the stream 321 byte[] digestBytes = digest.digest(); 322 return Base64.encode(digestBytes); 323 } 324}