FlowControlledDataChannel.java 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. /* _____ _
  2. * |_ _| |_ _ _ ___ ___ _ __ __ _
  3. * | | | ' \| '_/ -_) -_) ' \/ _` |_
  4. * |_| |_||_|_| \___\___|_|_|_\__,_(_)
  5. *
  6. * Threema for Android
  7. * Copyright (c) 2019-2022 Threema GmbH
  8. *
  9. * This program is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU Affero General Public License, version 3,
  11. * as published by the Free Software Foundation.
  12. *
  13. * This program is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU Affero General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU Affero General Public License
  19. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  20. */
  21. package ch.threema.app.webrtc;
  22. import org.saltyrtc.tasks.webrtc.exceptions.IllegalStateError;
  23. import org.slf4j.Logger;
  24. import org.webrtc.DataChannel;
  25. import androidx.annotation.AnyThread;
  26. import androidx.annotation.NonNull;
  27. import ch.threema.base.utils.LoggingUtil;
  28. import ch.threema.logging.ThreemaLogger;
  29. import java8.util.concurrent.CompletableFuture;
  30. /**
  31. * A flow-controlled (sender side) data channel.
  32. *
  33. * When using this, make sure to properly call `bufferedAmountChange` when the corresponding
  34. * event on the data channel is received.
  35. */
  36. @AnyThread
  37. public class FlowControlledDataChannel {
  38. @NonNull final private Logger logger = LoggingUtil.getThreemaLogger("FlowControlledDataChannel");
  39. @NonNull public final DataChannel dc;
  40. private final long lowWaterMark;
  41. private final long highWaterMark;
  42. @NonNull private CompletableFuture<Void> readyFuture = CompletableFuture.completedFuture(null);
  43. /**
  44. * Create a flow-controlled (sender side) data channel.
  45. *
  46. * @param dc The data channel to be flow-controlled
  47. */
  48. public FlowControlledDataChannel(@NonNull final String logPrefix, @NonNull final DataChannel dc) {
  49. this(logPrefix, dc, 256 * 1024, 1024 * 1024);
  50. }
  51. /**
  52. * Create a flow-controlled (sender side) data channel.
  53. *
  54. * @param dc The data channel to be flow-controlled
  55. * @param lowWaterMark The low water mark unpauses the data channel once
  56. * the buffered amount of bytes becomes less or equal to it.
  57. * @param highWaterMark The high water mark pauses the data channel once
  58. * the buffered amount of bytes becomes greater or equal to it.
  59. */
  60. public FlowControlledDataChannel(
  61. @NonNull final String logPrefix,
  62. @NonNull final DataChannel dc,
  63. final long lowWaterMark,
  64. final long highWaterMark
  65. ) {
  66. // Set logger prefix
  67. if (logger instanceof ThreemaLogger) {
  68. ((ThreemaLogger) logger).setPrefix(logPrefix + "." + dc.label() + "/" + dc.id());
  69. }
  70. this.dc = dc;
  71. this.lowWaterMark = lowWaterMark;
  72. this.highWaterMark = highWaterMark;
  73. }
  74. /**
  75. * Return the low water mark.
  76. */
  77. public long getLowWaterMark() {
  78. return this.lowWaterMark;
  79. }
  80. /**
  81. * Return the high water mark.
  82. */
  83. public long getHighWaterMark() {
  84. return this.highWaterMark;
  85. }
  86. /**
  87. * A future whether the data channel is ready to be written on.
  88. */
  89. @NonNull public synchronized CompletableFuture<Void> ready() {
  90. return this.readyFuture;
  91. }
  92. /**
  93. * Write a message to the data channel's internal buffer for delivery to
  94. * the remote side.
  95. *
  96. * Important: Before calling this, the `ready` Promise must be awaited.
  97. *
  98. * @param message The message to be sent.
  99. * @throws IllegalStateError in case the data channel is currently paused.
  100. */
  101. public synchronized void write(@NonNull final DataChannel.Buffer message) {
  102. // Note: Locked since the "onBufferedAmountChange" event must run in parallel to the send
  103. // calls.
  104. // Throw if paused
  105. if (!this.ready().isDone()) {
  106. throw new IllegalStateError("Unable to write, data channel is paused!");
  107. }
  108. // Try sending
  109. // Note: Technically we should be able to catch an Exception in case the
  110. // underlying buffer is full. However, webrtc.org is utterly
  111. // outdated and just closes when its buffer would overflow. Thus,
  112. // we use a well-tested high water mark instead and try to never
  113. // fill the buffer completely.
  114. if (!this.dc.send(message)) {
  115. // This can happen when the data channel is closing.
  116. throw new IllegalStateError("Unable to send in state " + this.dc.state());
  117. }
  118. // Pause once high water mark has been reached
  119. final long bufferedAmount = this.dc.bufferedAmount();
  120. if (bufferedAmount >= this.highWaterMark) {
  121. this.readyFuture = new CompletableFuture<>();
  122. if (logger.isDebugEnabled()) {
  123. logger.debug("{} paused (buffered={})", this.dc.label(), bufferedAmount);
  124. }
  125. }
  126. }
  127. /**
  128. * Must be called when the data channel's buffered amount changed.
  129. *
  130. * Important: You MUST ensure that you're not calling this from the send thread of the data
  131. * channel! When in doubt, post it to some other thread!
  132. */
  133. public synchronized void bufferedAmountChange() {
  134. final long bufferedAmount;
  135. try {
  136. bufferedAmount = this.dc.bufferedAmount();
  137. } catch (IllegalStateException e) {
  138. logger.warn("IllegalStateException when calling `dc.bufferedAmount`, data channel already disposed?");
  139. return;
  140. }
  141. // Unpause once low water mark has been reached
  142. if (bufferedAmount <= this.lowWaterMark && !this.readyFuture.isDone()) {
  143. if (logger.isDebugEnabled()) {
  144. logger.debug("{} resumed (buffered={})", this.dc.label(), bufferedAmount);
  145. }
  146. this.readyFuture.complete(null);
  147. }
  148. }
  149. }