001/*
002 * Copyright 2018 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2018 Ping Identity Corporation
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.IOException;
026import java.io.OutputStream;
027
028
029
030/**
031 * This class provides an {@code OutputStream} implementation that uses a
032 * {@link FixedRateBarrier} to impose an upper bound on the rate (in bytes per
033 * second) at which data can be written to a wrapped {@code OutputStream}.
034 */
035@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
036public final class RateLimitedOutputStream
037       extends OutputStream
038{
039  // Indicates whether to automatically flush the stream after each write.
040  private final boolean autoFlush;
041
042  // The fixed-rate barrier that will serve as a rate limiter for this class.
043  private final FixedRateBarrier rateLimiter;
044
045  // The output stream to which the data will actually be written.
046  private final OutputStream wrappedStream;
047
048  // The maximum number of bytes that can be written in any single call to the
049  // rate limiter.
050  private final int maxBytesPerWrite;
051
052
053
054  /**
055   * Creates a new instance of this rate-limited output stream that wraps the
056   * provided output stream.
057   *
058   * @param  wrappedStream      The output stream to which the data will
059   *                            actually be written.  It must not be
060   *                            {@code null}.
061   * @param  maxBytesPerSecond  The maximum number of bytes per second that can
062   *                            be written using this output stream.  It must be
063   *                            greater than zero.
064   * @param  autoFlush          Indicates whether to automatically flush the
065   *                            wrapped output stream after each write.
066   */
067  public RateLimitedOutputStream(final OutputStream wrappedStream,
068                                 final int maxBytesPerSecond,
069                                 final boolean autoFlush)
070  {
071    Validator.ensureTrue((wrappedStream != null),
072         "RateLimitedOutputStream.wrappedStream must not be null.");
073    Validator.ensureTrue((maxBytesPerSecond > 0),
074         "RateLimitedOutputStream.maxBytesPerSecond must be greater than " +
075              "zero.  The provided value was " + maxBytesPerSecond);
076
077    this.wrappedStream = wrappedStream;
078    this.autoFlush = autoFlush;
079
080    rateLimiter = new FixedRateBarrier(1000L, maxBytesPerSecond);
081    maxBytesPerWrite = Math.max(1, (maxBytesPerSecond / 100));
082  }
083
084
085
086  /**
087   * Closes this output stream and the wrapped stream.
088   *
089   * @throws  IOException  If a problem is encountered while closing the wrapped
090   *                       output stream.
091   */
092  @Override()
093  public void close()
094         throws IOException
095  {
096    wrappedStream.close();
097  }
098
099
100
101  /**
102   * Writes a single byte of data to the wrapped output stream.
103   *
104   * @param  b  The byte of data to be written.  Only the least significant
105   *            eight bits will be written.
106   *
107   * @throws  IOException  If a problem is encountered while writing to the
108   *                       wrapped stream.
109   */
110  @Override()
111  public void write(final int b)
112         throws IOException
113  {
114    rateLimiter.await();
115    wrappedStream.write(b);
116
117    if (autoFlush)
118    {
119      wrappedStream.flush();
120    }
121  }
122
123
124
125  /**
126   * Writes the contents of the provided array to the wrapped output stream.
127   *
128   * @param  b  The byte array containing the data to be written.  It must not
129   *            be {@code null}.
130   *
131   * @throws  IOException  If a problem is encountered while writing to the
132   *                       wrapped stream.
133   */
134  @Override()
135  public void write(final byte[] b)
136         throws IOException
137  {
138    write(b, 0, b.length);
139  }
140
141
142
143  /**
144   * Writes the contents of the specified portion of the provided array to the
145   * wrapped output stream.
146   *
147   * @param  b       The byte array containing the data to be written.  It must
148   *                 not be {@code null}.
149   * @param  offset  The position in the provided array at which the data to
150   *                 write begins.  It must be greater than or equal to zero and
151   *                 less than the length of the provided array.
152   * @param  length  The number of bytes to be written.  It must not be
153   *                 negative, and the sum of offset and length must be less
154   *                 than or equal to the length of the provided array.
155   *
156   * @throws  IOException  If a problem is encountered while writing to the
157   *                       wrapped stream.
158   */
159  @Override()
160  public void write(final byte[] b, final int offset, final int length)
161         throws IOException
162  {
163    if (length <= 0)
164    {
165      return;
166    }
167
168    if (length <= maxBytesPerWrite)
169    {
170      rateLimiter.await(length);
171      wrappedStream.write(b, offset, length);
172    }
173    else
174    {
175      int pos = offset;
176      int remainingToWrite = length;
177      while (remainingToWrite > 0)
178      {
179        final int lengthThisWrite =
180             Math.min(remainingToWrite, maxBytesPerWrite);
181        rateLimiter.await(lengthThisWrite);
182        wrappedStream.write(b, pos, lengthThisWrite);
183        pos += lengthThisWrite;
184        remainingToWrite -= lengthThisWrite;
185      }
186    }
187
188    if (autoFlush)
189    {
190      wrappedStream.flush();
191    }
192  }
193
194
195
196  /**
197   * Flushes the contents of the wrapped stream.
198   *
199   * @throws  IOException  If a problem is encountered while flushing the
200   *                       wrapped stream.
201   */
202  @Override()
203  public void flush()
204         throws IOException
205  {
206    wrappedStream.flush();
207  }
208}