001package com.box.sdk; 002 003import com.eclipsesource.json.Json; 004import com.eclipsesource.json.JsonArray; 005import com.eclipsesource.json.JsonObject; 006import com.eclipsesource.json.JsonValue; 007import java.util.ArrayList; 008import java.util.Collection; 009 010/** 011 * Receives real-time events from the API and forwards them to {@link EventListener EventListeners}. 012 * 013 * <p>This class handles long polling the Box events endpoint in order to receive real-time user 014 * events. When an EventStream is started, it begins long polling on a separate thread until the 015 * {@link #stop} method is called. Since the API may return duplicate events, EventStream also 016 * maintains a small cache of the most recently received event IDs in order to automatically 017 * deduplicate events. 018 * 019 * <p>Note: Enterprise Events can be accessed by admin users with the EventLog.getEnterpriseEvents 020 * method 021 */ 022public class EventStream { 023 024 private static final int LIMIT = 800; 025 /** Events URL. */ 026 public static final URLTemplate EVENT_URL = 027 new URLTemplate("events?limit=" + LIMIT + "&stream_position=%s"); 028 029 private static final int STREAM_POSITION_NOW = -1; 030 private static final int DEFAULT_POLLING_DELAY = 1000; 031 private final BoxAPIConnection api; 032 private final long startingPosition; 033 private final int pollingDelay; 034 private final Collection<EventListener> listeners; 035 private final Object listenerLock; 036 037 private LRUCache<String> receivedEvents; 038 private boolean started; 039 private Poller poller; 040 private Thread pollerThread; 041 042 /** 043 * Constructs an EventStream using an API connection. 044 * 045 * @param api the API connection to use. 046 */ 047 public EventStream(BoxAPIConnection api) { 048 this(api, STREAM_POSITION_NOW, DEFAULT_POLLING_DELAY); 049 } 050 051 /** 052 * Constructs an EventStream using an API connection and a starting initial position. 053 * 054 * @param api the API connection to use. 055 * @param startingPosition the starting position of the event stream. 056 */ 057 public EventStream(BoxAPIConnection api, long startingPosition) { 058 this(api, startingPosition, DEFAULT_POLLING_DELAY); 059 } 060 061 /** 062 * Constructs an EventStream using an API connection and a starting initial position with custom 063 * polling delay. 064 * 065 * @param api the API connection to use. 066 * @param startingPosition the starting position of the event stream. 067 * @param pollingDelay the delay in milliseconds between successive calls to get more events. 068 */ 069 public EventStream(BoxAPIConnection api, long startingPosition, int pollingDelay) { 070 this.api = api; 071 this.startingPosition = startingPosition; 072 this.listeners = new ArrayList<>(); 073 this.listenerLock = new Object(); 074 this.pollingDelay = pollingDelay; 075 } 076 077 /** 078 * Adds a listener that will be notified when an event is received. 079 * 080 * @param listener the listener to add. 081 */ 082 public void addListener(EventListener listener) { 083 synchronized (this.listenerLock) { 084 this.listeners.add(listener); 085 } 086 } 087 088 /** 089 * Indicates whether or not this EventStream has been started. 090 * 091 * @return true if this EventStream has been started; otherwise false. 092 */ 093 public boolean isStarted() { 094 return this.started; 095 } 096 097 /** 098 * Stops this EventStream and disconnects from the API. 099 * 100 * @throws IllegalStateException if the EventStream is already stopped. 101 */ 102 public void stop() { 103 if (!this.started) { 104 throw new IllegalStateException("Cannot stop the EventStream because it isn't started."); 105 } 106 107 this.started = false; 108 this.pollerThread.interrupt(); 109 } 110 111 /** 112 * Starts this EventStream and begins long polling the API. 113 * 114 * @throws IllegalStateException if the EventStream is already started. 115 */ 116 public void start() { 117 if (this.started) { 118 throw new IllegalStateException("Cannot start the EventStream because it isn't stopped."); 119 } 120 121 final long initialPosition; 122 123 if (this.startingPosition == STREAM_POSITION_NOW) { 124 BoxJSONRequest request = 125 new BoxJSONRequest(this.api, EVENT_URL.buildAlpha(this.api.getBaseURL(), "now"), "GET"); 126 try (BoxJSONResponse response = request.send()) { 127 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 128 initialPosition = jsonObject.get("next_stream_position").asLong(); 129 } 130 } else { 131 assert this.startingPosition >= 0 : "Starting position must be non-negative"; 132 initialPosition = this.startingPosition; 133 } 134 135 this.poller = new Poller(initialPosition); 136 137 this.pollerThread = new Thread(this.poller); 138 this.pollerThread.setUncaughtExceptionHandler((t, e) -> EventStream.this.notifyException(e)); 139 this.pollerThread.start(); 140 141 this.started = true; 142 } 143 144 /** 145 * Indicates whether or not an event ID is a duplicate. 146 * 147 * <p>This method can be overridden by a subclass in order to provide custom de-duping logic. 148 * 149 * @param eventID the event ID. 150 * @return true if the event is a duplicate; otherwise false. 151 */ 152 protected boolean isDuplicate(String eventID) { 153 if (this.receivedEvents == null) { 154 this.receivedEvents = new LRUCache<>(); 155 } 156 157 return !this.receivedEvents.add(eventID); 158 } 159 160 private void notifyNextPosition(long position) { 161 synchronized (this.listenerLock) { 162 for (EventListener listener : this.listeners) { 163 listener.onNextPosition(position); 164 } 165 } 166 } 167 168 private void notifyEvent(BoxEvent event) { 169 synchronized (this.listenerLock) { 170 boolean isDuplicate = this.isDuplicate(event.getID()); 171 if (!isDuplicate) { 172 for (EventListener listener : this.listeners) { 173 listener.onEvent(event); 174 } 175 } 176 } 177 } 178 179 private void notifyException(Throwable e) { 180 if (e instanceof InterruptedException && !this.started) { 181 return; 182 } 183 184 this.stop(); 185 synchronized (this.listenerLock) { 186 for (EventListener listener : this.listeners) { 187 if (listener.onException(e)) { 188 return; 189 } 190 } 191 } 192 } 193 194 private class Poller implements Runnable { 195 private final long initialPosition; 196 197 private RealtimeServerConnection server; 198 199 Poller(long initialPosition) { 200 this.initialPosition = initialPosition; 201 this.server = new RealtimeServerConnection(EventStream.this.api); 202 } 203 204 @Override 205 public void run() { 206 long position = this.initialPosition; 207 while (!Thread.interrupted()) { 208 if (this.server.getRemainingRetries() == 0) { 209 this.server = new RealtimeServerConnection(EventStream.this.api); 210 } 211 212 if (this.server.waitForChange(position)) { 213 if (Thread.interrupted()) { 214 return; 215 } 216 217 BoxJSONRequest request = 218 new BoxJSONRequest( 219 EventStream.this.api, 220 EVENT_URL.buildAlpha(EventStream.this.api.getBaseURL(), position), 221 "GET"); 222 try (BoxJSONResponse response = request.send()) { 223 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 224 JsonArray entriesArray = jsonObject.get("entries").asArray(); 225 for (JsonValue entry : entriesArray) { 226 BoxEvent event = new BoxEvent(EventStream.this.api, entry.asObject()); 227 EventStream.this.notifyEvent(event); 228 } 229 position = jsonObject.get("next_stream_position").asLong(); 230 EventStream.this.notifyNextPosition(position); 231 try { 232 // Delay re-polling to avoid making too many API calls 233 // Since duplicate events may appear in the stream, without any delay added 234 // the stream can make 3-5 requests per second and not produce any new 235 // events. A short delay between calls balances latency for new events 236 // and the risk of hitting rate limits. 237 Thread.sleep(EventStream.this.pollingDelay); 238 } catch (InterruptedException ex) { 239 return; 240 } 241 } 242 } 243 } 244 } 245 } 246}