001package com.box.sdk;
002
003import com.eclipsesource.json.Json;
004import com.eclipsesource.json.JsonArray;
005import com.eclipsesource.json.JsonObject;
006import com.eclipsesource.json.JsonValue;
007import java.util.ArrayList;
008import java.util.Collection;
009
010/**
011 * Receives real-time events from the API and forwards them to {@link EventListener EventListeners}.
012 *
013 * <p>This class handles long polling the Box events endpoint in order to receive real-time user
014 * events. When an EventStream is started, it begins long polling on a separate thread until the
015 * {@link #stop} method is called. Since the API may return duplicate events, EventStream also
016 * maintains a small cache of the most recently received event IDs in order to automatically
017 * deduplicate events.
018 *
019 * <p>Note: Enterprise Events can be accessed by admin users with the EventLog.getEnterpriseEvents
020 * method
021 */
022public class EventStream {
023
024  private static final int LIMIT = 800;
025  /** Events URL. */
026  public static final URLTemplate EVENT_URL =
027      new URLTemplate("events?limit=" + LIMIT + "&stream_position=%s");
028
029  private static final int STREAM_POSITION_NOW = -1;
030  private static final int DEFAULT_POLLING_DELAY = 1000;
031  private final BoxAPIConnection api;
032  private final long startingPosition;
033  private final int pollingDelay;
034  private final Collection<EventListener> listeners;
035  private final Object listenerLock;
036
037  private LRUCache<String> receivedEvents;
038  private boolean started;
039  private Poller poller;
040  private Thread pollerThread;
041
042  /**
043   * Constructs an EventStream using an API connection.
044   *
045   * @param api the API connection to use.
046   */
047  public EventStream(BoxAPIConnection api) {
048    this(api, STREAM_POSITION_NOW, DEFAULT_POLLING_DELAY);
049  }
050
051  /**
052   * Constructs an EventStream using an API connection and a starting initial position.
053   *
054   * @param api the API connection to use.
055   * @param startingPosition the starting position of the event stream.
056   */
057  public EventStream(BoxAPIConnection api, long startingPosition) {
058    this(api, startingPosition, DEFAULT_POLLING_DELAY);
059  }
060
061  /**
062   * Constructs an EventStream using an API connection and a starting initial position with custom
063   * polling delay.
064   *
065   * @param api the API connection to use.
066   * @param startingPosition the starting position of the event stream.
067   * @param pollingDelay the delay in milliseconds between successive calls to get more events.
068   */
069  public EventStream(BoxAPIConnection api, long startingPosition, int pollingDelay) {
070    this.api = api;
071    this.startingPosition = startingPosition;
072    this.listeners = new ArrayList<>();
073    this.listenerLock = new Object();
074    this.pollingDelay = pollingDelay;
075  }
076
077  /**
078   * Adds a listener that will be notified when an event is received.
079   *
080   * @param listener the listener to add.
081   */
082  public void addListener(EventListener listener) {
083    synchronized (this.listenerLock) {
084      this.listeners.add(listener);
085    }
086  }
087
088  /**
089   * Indicates whether or not this EventStream has been started.
090   *
091   * @return true if this EventStream has been started; otherwise false.
092   */
093  public boolean isStarted() {
094    return this.started;
095  }
096
097  /**
098   * Stops this EventStream and disconnects from the API.
099   *
100   * @throws IllegalStateException if the EventStream is already stopped.
101   */
102  public void stop() {
103    if (!this.started) {
104      throw new IllegalStateException("Cannot stop the EventStream because it isn't started.");
105    }
106
107    this.started = false;
108    this.pollerThread.interrupt();
109  }
110
111  /**
112   * Starts this EventStream and begins long polling the API.
113   *
114   * @throws IllegalStateException if the EventStream is already started.
115   */
116  public void start() {
117    if (this.started) {
118      throw new IllegalStateException("Cannot start the EventStream because it isn't stopped.");
119    }
120
121    final long initialPosition;
122
123    if (this.startingPosition == STREAM_POSITION_NOW) {
124      BoxJSONRequest request =
125          new BoxJSONRequest(this.api, EVENT_URL.buildAlpha(this.api.getBaseURL(), "now"), "GET");
126      try (BoxJSONResponse response = request.send()) {
127        JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
128        initialPosition = jsonObject.get("next_stream_position").asLong();
129      }
130    } else {
131      assert this.startingPosition >= 0 : "Starting position must be non-negative";
132      initialPosition = this.startingPosition;
133    }
134
135    this.poller = new Poller(initialPosition);
136
137    this.pollerThread = new Thread(this.poller);
138    this.pollerThread.setUncaughtExceptionHandler((t, e) -> EventStream.this.notifyException(e));
139    this.pollerThread.start();
140
141    this.started = true;
142  }
143
144  /**
145   * Indicates whether or not an event ID is a duplicate.
146   *
147   * <p>This method can be overridden by a subclass in order to provide custom de-duping logic.
148   *
149   * @param eventID the event ID.
150   * @return true if the event is a duplicate; otherwise false.
151   */
152  protected boolean isDuplicate(String eventID) {
153    if (this.receivedEvents == null) {
154      this.receivedEvents = new LRUCache<>();
155    }
156
157    return !this.receivedEvents.add(eventID);
158  }
159
160  private void notifyNextPosition(long position) {
161    synchronized (this.listenerLock) {
162      for (EventListener listener : this.listeners) {
163        listener.onNextPosition(position);
164      }
165    }
166  }
167
168  private void notifyEvent(BoxEvent event) {
169    synchronized (this.listenerLock) {
170      boolean isDuplicate = this.isDuplicate(event.getID());
171      if (!isDuplicate) {
172        for (EventListener listener : this.listeners) {
173          listener.onEvent(event);
174        }
175      }
176    }
177  }
178
179  private void notifyException(Throwable e) {
180    if (e instanceof InterruptedException && !this.started) {
181      return;
182    }
183
184    this.stop();
185    synchronized (this.listenerLock) {
186      for (EventListener listener : this.listeners) {
187        if (listener.onException(e)) {
188          return;
189        }
190      }
191    }
192  }
193
194  private class Poller implements Runnable {
195    private final long initialPosition;
196
197    private RealtimeServerConnection server;
198
199    Poller(long initialPosition) {
200      this.initialPosition = initialPosition;
201      this.server = new RealtimeServerConnection(EventStream.this.api);
202    }
203
204    @Override
205    public void run() {
206      long position = this.initialPosition;
207      while (!Thread.interrupted()) {
208        if (this.server.getRemainingRetries() == 0) {
209          this.server = new RealtimeServerConnection(EventStream.this.api);
210        }
211
212        if (this.server.waitForChange(position)) {
213          if (Thread.interrupted()) {
214            return;
215          }
216
217          BoxJSONRequest request =
218              new BoxJSONRequest(
219                  EventStream.this.api,
220                  EVENT_URL.buildAlpha(EventStream.this.api.getBaseURL(), position),
221                  "GET");
222          try (BoxJSONResponse response = request.send()) {
223            JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
224            JsonArray entriesArray = jsonObject.get("entries").asArray();
225            for (JsonValue entry : entriesArray) {
226              BoxEvent event = new BoxEvent(EventStream.this.api, entry.asObject());
227              EventStream.this.notifyEvent(event);
228            }
229            position = jsonObject.get("next_stream_position").asLong();
230            EventStream.this.notifyNextPosition(position);
231            try {
232              // Delay re-polling to avoid making too many API calls
233              // Since duplicate events may appear in the stream, without any delay added
234              // the stream can make 3-5 requests per second and not produce any new
235              // events.  A short delay between calls balances latency for new events
236              // and the risk of hitting rate limits.
237              Thread.sleep(EventStream.this.pollingDelay);
238            } catch (InterruptedException ex) {
239              return;
240            }
241          }
242        }
243      }
244    }
245  }
246}