Wednesday, February 13, 2013

Amazon S3 multipart upload with pause/resume functionality for .NET (Part 1)

The Amazon S3 .NET SDK has the class TransferUtility that can be easily used to upload files using multipart requests. The drawback of this class is that is lacks the pause/resume functionality. I'm sure that most applications don't even need this, but if you are uploading large files then and you want to pause the upload for some time then you can't :)

In this post I will share my implementation of a class that can do pause of the uploading file, of course it has also some limitations (it was not a problem for my application), because of the 5MB minimum part size limit of an multipart upload imposed by Amazon the pause function has a granularity of 5MB, e.g. the upload will pause only when the currently uploading part will be fully uploaded.

Another limitation of the Amazon S3 TransferUtility is that it uses the .NET Asynchronous Begin/End pattern, and the problem is that if you start to do an upload of 700MB then it can be stopped only when:

  • the process is killed
  • the upload fails by some reason

Bellow is the code that can pause/resume and stop an upload. In the next post (part 2 coming soon) I will share the code for resuming an upload even after the application was closed.

The class also contains two helper static methods that you can use to ListMultipartUploads and AbortMultipartUpload.

You can download demo application that shows how to use the bellow class from here

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;

using Amazon.S3;
using Amazon.S3.Model;

namespace AmazonS3Multipart
{
    public class PausableS3MultipartUploader
    {
        protected static readonly int UPLOAD_TIMEOUT = 60 * 60 * 1000;
        protected static readonly long PART_SIZE = 5 * 1024 * 1024;

        public delegate void PausableS3MultipartUploaderReady(string Key, Exception e);
        public delegate void PausableS3MultipartUploaderProgress(string Key, int PercentDone);

        public event PausableS3MultipartUploaderReady PausableS3MultipartUploaderReadyEvent;
        public event PausableS3MultipartUploaderProgress PausableS3MultipartUploaderProgressEvent;

        private AmazonS3 _client;
        private string _bucketName = "devbt";
        private string _key = "diana123/multi/multipart.txt";
        private string _filePath = @"D:\workspace\multipart\test1.txt";
        private string _uploadId = null;

        protected volatile bool _stopped = false;
        protected volatile bool _paused = false;
        protected volatile bool _started = false;
        public bool Paused
        {
            get { return _paused; }
            set { _paused = value; }
        }

        protected double _progress = 0;
        protected double _totalWorkToDo = 0;
        protected int _percent = 0;

        private Thread _workerThread = null;

        private Exception _exception = null;

        public PausableS3MultipartUploader(string FilePath, string Key, string BucketName, AmazonS3 Client)
        {
            _filePath = FilePath;
            _key = Key;
            _client = Client;
        }

        public void Start()
        {
            if (!_started && !_stopped)
            {
                _started = true;
                _workerThread = new Thread(_work);
                _workerThread.Name = "PausableS3MultipartUploader#" + System.Guid.NewGuid().ToString();
                _workerThread.Start();
            }
        }

        public void Stop()
        {
            if (_started && !_stopped)
            {
                // stop
                _stopped = true;
                _paused = false;
                _workerThread.Abort();
            }
        }

        private void _work()
        {
            _upload();

            PausableS3MultipartUploaderReadyEvent.Invoke(_key, _exception);
            _stopped = true;
        }

        private void _handlePutObjectRequestProgress(object sender, PutObjectProgressArgs args)
        {
            _updateProgress((double)args.TransferredBytes);
        }

        protected void _putObject(Stream inputStream)
        {
            PutObjectRequest req = new PutObjectRequest()
                                .WithBucketName(_bucketName)
                                .WithKey(_key)
                                .WithAutoCloseStream(false)
                                .WithTimeout(UPLOAD_TIMEOUT);

            inputStream.Seek(0, SeekOrigin.Begin);
            req.InputStream = inputStream;
            req.PutObjectProgressEvent += new EventHandler(_handlePutObjectRequestProgress);

            _client.PutObject(req).Dispose();
        }

        // problem fix with incorrect UploadPartProgressArgs transfered bytes value
        private double _partProgress = 0;

        private void _handleUploadPartRequestProgress(object sender, UploadPartProgressArgs args)
        {
            _progress = _partProgress + ((double)(args.TotalBytes * args.PercentDone)) / 100;
            _updateProgress(0);
        }

        // partNumber: zero indexed
        protected UploadPartResponse _putPart(int partNumber, long partSize, Stream inputStream)
        {
            inputStream.Seek(partNumber * PART_SIZE, SeekOrigin.Begin);

            UploadPartRequest req = new UploadPartRequest()
                            .WithBucketName(_bucketName)
                            .WithGenerateChecksum(true)
                            .WithKey(_key)
                            .WithPartNumber(partNumber + 1)
                            .WithPartSize(partSize)
                            .WithUploadId(_uploadId)
                            .WithTimeout(UPLOAD_TIMEOUT);

            req.UploadPartProgressEvent += new EventHandler(_handleUploadPartRequestProgress);
            req.InputStream = inputStream;

            return _client.UploadPart(req);
        }

        private void _uploadMultipart(Stream inputFileStream)
        {
            _uploadId = _initiateMultipartUpload();
            Console.WriteLine("UploadId={0}", _uploadId);

            double div = _totalWorkToDo / (PART_SIZE);
            long partCount = (long)(div);
            if ((partCount * PART_SIZE) < (int)_totalWorkToDo)
                ++partCount;
            Console.WriteLine("partCount={0}", partCount);

            long partSize = 0;
            List etags = new List();

            for (int i=0; i < partCount; i++)
            {
                while (_paused == true)
                    Thread.Sleep(1000);
                if (_stopped)
                {
                    //abortMultipartUpload(_uploadId, Key);
                    break;
                }
                if (i == partCount - 1 && (partCount * PART_SIZE) > _totalWorkToDo)
                    partSize = (int)_totalWorkToDo - ((partCount - 1) * PART_SIZE); // last part
                else
                    partSize = PART_SIZE;

                Console.WriteLine("PartNo={0}, PartSize={1}", i + 1, partSize);

                UploadPartResponse partResponse = _putPart(i, partSize, inputFileStream);
                etags.Add(new PartETag(i + 1, partResponse.ETag));

                _partProgress += partSize;
            }

            _completeMultipartUpload(etags);
        }

        private string _initiateMultipartUpload()
        {
            InitiateMultipartUploadRequest req = new InitiateMultipartUploadRequest()
                    .WithBucketName(_bucketName)
                    .WithKey(_key);

            return _client.InitiateMultipartUpload(req).UploadId;
        }

        private void _completeMultipartUpload(List etags)
        {
            CompleteMultipartUploadRequest req = new CompleteMultipartUploadRequest()
                    .WithBucketName(_bucketName)
                    .WithKey(_key)
                    .WithUploadId(_uploadId)
                    .WithPartETags(etags);

            CompleteMultipartUploadResponse res = _client.CompleteMultipartUpload(req);
            Console.WriteLine("Complete => {0}", res.ETag);
        }

        private void _upload()
        {
            try
            {
                using (FileStream inputFileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
                {
                    _totalWorkToDo = inputFileStream.Length;

                    // if file size is less than 5MB then don't use multipart
                    if (_totalWorkToDo <= PART_SIZE)
                    {
                        _putObject(inputFileStream);
                    }
                    else
                    {
                        _uploadMultipart(inputFileStream);
                    }
                }
            }
            catch (ThreadAbortException e)
            {
                Console.WriteLine("Upload aborted: {0}", e.Message);
            }
            catch (Exception e)
            {
                _exception = e;
                Console.WriteLine("Upload exception: {0}", e.Message);
            }
        }

        protected void _updateProgress(double value)
        {
            _progress += value;
            if (_progress > _totalWorkToDo) _progress = _totalWorkToDo;
            int currValue = (int)((_progress * 100) / _totalWorkToDo);
            if (currValue != _percent)
            {
                _percent = currValue;
                PausableS3MultipartUploaderProgressEvent.Invoke(_key, _percent);
            }
        }

        public static List ListMultipartUploads(string BucketName, AmazonS3 Client, string Prefix = "")
        {
            ListMultipartUploadsRequest listMultipartRequest = new ListMultipartUploadsRequest()
                    .WithBucketName(BucketName)
                    .WithPrefix(Prefix);

            return Client.ListMultipartUploads(listMultipartRequest).MultipartUploads;
        }

        public static void AbortMultipartUpload(string BucketName, AmazonS3 Client, string Key, string UploadId)
        {
            AbortMultipartUploadRequest req = new AbortMultipartUploadRequest()
                        .WithBucketName(BucketName)
                        .WithKey(Key)
                        .WithUploadId(UploadId);

            Client.AbortMultipartUpload(req).Dispose();
        }
    }
}

I'm coming from the C/C++ world so C# is not my main development language, so if you see something not as it should be please leave a comment and I'll change the code :D

Happy coding.

2 comments:

Unknown said...

Hi !! I have been trying to use java api since couple of weeks and this is actually working. Thanks for the help. Btw, when do you think the part 2 (Exiting an application pauses the upload) would be posted on the blog ?

Anonymous said...

Can you please upload PART 2 ?