public class

BufferedAudioStream

extends java.lang.Object

implements AudioStream

 java.lang.Object

↳androidx.camera.video.internal.audio.BufferedAudioStream

Gradle dependencies

compile group: 'androidx.camera', name: 'camera-video', version: '1.5.0-alpha01'

  • groupId: androidx.camera
  • artifactId: camera-video
  • version: 1.5.0-alpha01

Artifact androidx.camera:camera-video:1.5.0-alpha01 it located at Google repository (https://maven.google.com/)

Overview

The class implements a buffered AudioStream.

A BufferedAudioStream adds functionality to another AudioStream, the ability to buffer the input audio data and to decouple audio data producing with consuming. When the BufferedAudioStream is created, an internal buffer queue is created. The queue's size is limited to prevent memory from being overused. When the queue's size exceeds the limit, the oldest cached data will be discarded.

This class is not thread safe, it should be used on the same thread.

Summary

Constructors
publicBufferedAudioStream(AudioStream audioStream, AudioSettings audioSettings)

Methods
public AudioStream.PacketInforead(java.nio.ByteBuffer byteBuffer)

public voidrelease()

public voidsetCallback(AudioStream.AudioStreamCallback callback, java.util.concurrent.Executor executor)

public voidstart()

public voidstop()

from java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

Constructors

public BufferedAudioStream(AudioStream audioStream, AudioSettings audioSettings)

Methods

public void start()

public void stop()

public void release()

public AudioStream.PacketInfo read(java.nio.ByteBuffer byteBuffer)

public void setCallback(AudioStream.AudioStreamCallback callback, java.util.concurrent.Executor executor)

Source

/*
 * Copyright 2023 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.camera.video.internal.audio;

import static androidx.camera.video.internal.audio.AudioUtils.frameCountToDurationNs;
import static androidx.camera.video.internal.audio.AudioUtils.sizeToFrameCount;
import static androidx.core.util.Preconditions.checkArgument;
import static androidx.core.util.Preconditions.checkState;

import android.annotation.SuppressLint;

import androidx.annotation.GuardedBy;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.camera.core.Logger;
import androidx.camera.core.impl.annotation.ExecutedBy;
import androidx.camera.core.impl.utils.executor.CameraXExecutors;

import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * The class implements a buffered AudioStream.
 *
 * <p>A BufferedAudioStream adds functionality to another AudioStream, the ability to buffer the
 * input audio data and to decouple audio data producing with consuming. When the
 * BufferedAudioStream is created, an internal buffer queue is created. The queue's size is limited
 * to prevent memory from being overused. When the queue's size exceeds the limit, the oldest
 * cached data will be discarded.
 *
 * <p>This class is not thread safe, it should be used on the same thread.
 */
public class BufferedAudioStream implements AudioStream {

    private static final String TAG = "BufferedAudioStream";
    private static final int DEFAULT_BUFFER_SIZE_IN_FRAME = 1024;
    private static final int DEFAULT_QUEUE_SIZE = 500;
    private static final int DATA_WAITING_TIME_MILLIS = 1;

    private final AtomicBoolean mIsStarted = new AtomicBoolean(false);
    private final AtomicBoolean mIsReleased = new AtomicBoolean(false);
    @GuardedBy("mLock")
    private final Queue<AudioData> mAudioDataQueue = new ConcurrentLinkedQueue<>();
    private final Executor mProducerExecutor = CameraXExecutors.newSequentialExecutor(
            CameraXExecutors.audioExecutor());
    private final Object mLock = new Object();
    @GuardedBy("mLock")
    @Nullable
    private AudioData mAudioDataNotFullyRead = null;

    ////////////////////////////////////////////////////////////////////////////////////////////////
    //                      Members only accessed on mProducerExecutor                            //
    ////////////////////////////////////////////////////////////////////////////////////////////////
    private final AudioStream mAudioStream;
    private final int mBytesPerFrame;
    private final int mSampleRate;
    private final int mQueueMaxSize;
    private final AtomicBoolean mIsCollectingAudioData = new AtomicBoolean(false);
    private int mBufferSize;

    public BufferedAudioStream(@NonNull AudioStream audioStream,
            @NonNull AudioSettings audioSettings) {
        mAudioStream = audioStream;
        mBytesPerFrame = audioSettings.getBytesPerFrame();
        mSampleRate = audioSettings.getSampleRate();

        checkArgument(mBytesPerFrame > 0L, "mBytesPerFrame must be greater than 0.");
        checkArgument(mSampleRate > 0L, "mSampleRate must be greater than 0.");

        mQueueMaxSize = DEFAULT_QUEUE_SIZE;
        mBufferSize = DEFAULT_BUFFER_SIZE_IN_FRAME * mBytesPerFrame;
    }

    @Override
    public void start() throws AudioStreamException, IllegalStateException {
        checkNotReleasedOrThrow();
        if (mIsStarted.getAndSet(true)) {
            return;
        }

        // Start internal audio data collection.
        RunnableFuture<Void> startTask = new FutureTask<>(() -> {
            try {
                mAudioStream.start();
                startCollectingAudioData();
            } catch (AudioStreamException e) {
                throw new RuntimeException(e);
            }
        }, null);
        mProducerExecutor.execute(startTask);

        // Wait for the internal audio stream to start.
        try {
            startTask.get();
        } catch (InterruptedException | ExecutionException e) {
            mIsStarted.set(false);
            throw new AudioStreamException(e);
        }
    }

    @Override
    public void stop() throws IllegalStateException {
        checkNotReleasedOrThrow();
        if (!mIsStarted.getAndSet(false)) {
            return;
        }

        // Stop internal audio data collection.
        mProducerExecutor.execute(() -> {
            mIsCollectingAudioData.set(false);
            mAudioStream.stop();
            synchronized (mLock) {
                mAudioDataNotFullyRead = null;
                mAudioDataQueue.clear();
            }
        });
    }

    @Override
    public void release() {
        if (mIsReleased.getAndSet(true)) {
            return;
        }

        mProducerExecutor.execute(() -> {
            mIsCollectingAudioData.set(false);
            mAudioStream.release();
            synchronized (mLock) {
                mAudioDataNotFullyRead = null;
                mAudioDataQueue.clear();
            }
        });
    }

    @SuppressLint("BanThreadSleep")
    @NonNull
    @Override
    public PacketInfo read(@NonNull ByteBuffer byteBuffer) {
        checkNotReleasedOrThrow();
        checkStartedOrThrow();

        // Match collection buffer size and read buffer size to improve read efficiency.
        updateCollectionBufferSizeAsync(byteBuffer.remaining());

        // Block the thread till the audio data is actually read.
        boolean isWaitingForData;
        PacketInfo packetInfo = PacketInfo.of(0, 0);
        do {
            synchronized (mLock) {
                AudioData audioData = mAudioDataNotFullyRead;
                mAudioDataNotFullyRead = null;
                if (audioData == null) {
                    audioData = mAudioDataQueue.poll();
                }

                if (audioData != null) {
                    packetInfo = audioData.read(byteBuffer);

                    if (audioData.getRemainingBufferSizeInBytes() > 0) {
                        mAudioDataNotFullyRead = audioData;
                    }
                }
            }

            // Wait for data collection if no data to read and the audio stream is still running.
            isWaitingForData =
                    packetInfo.getSizeInBytes() <= 0 && mIsStarted.get() && !mIsReleased.get();

            // Sleep to prevent busy accessing to variables.
            if (isWaitingForData) {
                try {
                    Thread.sleep(DATA_WAITING_TIME_MILLIS);
                } catch (InterruptedException e) {
                    Logger.w(TAG, "Interruption while waiting for audio data", e);
                    break;
                }
            }
        } while (isWaitingForData);

        return packetInfo;
    }

    @Override
    public void setCallback(@Nullable AudioStreamCallback callback, @Nullable Executor executor) {
        checkState(!mIsStarted.get(), "AudioStream can not be started when setCallback.");
        checkNotReleasedOrThrow();
        checkArgument(callback == null || executor != null,
                "executor can't be null with non-null callback.");

        mProducerExecutor.execute(() -> mAudioStream.setCallback(callback, executor));
    }

    private void checkNotReleasedOrThrow() {
        checkState(!mIsReleased.get(), "AudioStream has been released.");
    }

    private void checkStartedOrThrow() {
        checkState(mIsStarted.get(), "AudioStream has not been started.");
    }

    private void updateCollectionBufferSizeAsync(int bufferSize) {
        mProducerExecutor.execute(() -> updateCollectionBufferSize(bufferSize));
    }

    @ExecutedBy("mProducerExecutor")
    private void updateCollectionBufferSize(int bufferSize) {
        if (mBufferSize == bufferSize) {
            return;
        }

        // Ensure buffer size is multiple of the frame size.
        int originalBufferSize = mBufferSize;
        int newFrameSize = bufferSize / mBytesPerFrame;
        mBufferSize = newFrameSize * mBytesPerFrame;

        Logger.d(TAG, "Update buffer size from " + originalBufferSize + " to " + mBufferSize);
    }

    @ExecutedBy("mProducerExecutor")
    private void startCollectingAudioData() {
        if (mIsCollectingAudioData.getAndSet(true)) {
            return;
        }

        collectAudioData();
    }

    @ExecutedBy("mProducerExecutor")
    private void collectAudioData() {
        if (!mIsCollectingAudioData.get()) {
            return;
        }

        // Read audio data.
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(mBufferSize);
        PacketInfo packetInfo = mAudioStream.read(byteBuffer);
        AudioData audioData = new AudioData(byteBuffer, packetInfo, mBytesPerFrame, mSampleRate);

        // Push audio data to the queue.
        int queueMaxSize = mQueueMaxSize;
        synchronized (mLock) {
            mAudioDataQueue.offer(audioData);

            // Pop audio data when the queue size exceeds the limit.
            while (mAudioDataQueue.size() > queueMaxSize) {
                mAudioDataQueue.poll();
                Logger.w(TAG, "Drop audio data due to full of queue.");
            }
        }

        // Start next data collection.
        if (mIsCollectingAudioData.get()) {
            mProducerExecutor.execute(this::collectAudioData);
        }
    }

    private static class AudioData {

        private final int mBytesPerFrame;
        private final int mSampleRate;
        private final ByteBuffer mByteBuffer;
        private long mTimestampNs;

        AudioData(@NonNull ByteBuffer byteBuffer, @NonNull PacketInfo packetInfo,
                int bytesPerFrame, int sampleRate) {
            // Make the buffer ready for reading.
            byteBuffer.rewind();

            // Check if byte buffer match with packet info.
            int bufferSize = byteBuffer.limit() - byteBuffer.position();
            if (bufferSize != packetInfo.getSizeInBytes()) {
                throw new IllegalStateException(
                        "Byte buffer size is not match with packet info: " + bufferSize + " != "
                                + packetInfo.getSizeInBytes());
            }

            mBytesPerFrame = bytesPerFrame;
            mSampleRate = sampleRate;
            mByteBuffer = byteBuffer;
            mTimestampNs = packetInfo.getTimestampNs();
        }

        public int getRemainingBufferSizeInBytes() {
            return mByteBuffer.remaining();
        }

        public PacketInfo read(@NonNull ByteBuffer byteBuffer) {
            long timestampNs = mTimestampNs;

            // Check the read size, read data and handle timestamp for the remaining data.
            int readSizeInBytes;
            int originalSourcePosition = mByteBuffer.position();
            int originalDestinationPosition = byteBuffer.position();
            if (mByteBuffer.remaining() > byteBuffer.remaining()) {
                readSizeInBytes = byteBuffer.remaining();

                // Update the next timestamp to the start of the unread part.
                long readFrames = sizeToFrameCount(readSizeInBytes, mBytesPerFrame);
                long readDurationNs = frameCountToDurationNs(readFrames, mSampleRate);
                mTimestampNs += readDurationNs;

                // Use the duplicated byte buffer to put data into the destination to limit the
                // read size and to not corrupt the source.
                ByteBuffer duplicatedByteBuffer = mByteBuffer.duplicate();
                duplicatedByteBuffer.position(originalSourcePosition)
                        .limit(originalSourcePosition + readSizeInBytes);
                byteBuffer.put(duplicatedByteBuffer)
                        .limit(originalDestinationPosition + readSizeInBytes)
                        .position(originalDestinationPosition);

            } else {
                readSizeInBytes = mByteBuffer.remaining();

                // Put data into byte buffer.
                byteBuffer.put(mByteBuffer)
                        .limit(originalDestinationPosition + readSizeInBytes)
                        .position(originalDestinationPosition);
            }

            // Point to the start of the unread part.
            mByteBuffer.position(originalSourcePosition + readSizeInBytes);

            return PacketInfo.of(readSizeInBytes, timestampNs);
        }
    }
}