diff --git a/src/Tools/IdeCoreBenchmarks/IdeCoreBenchmarks.csproj b/src/Tools/IdeCoreBenchmarks/IdeCoreBenchmarks.csproj index 6ff0a32ca7726..75fb67aa863ff 100644 --- a/src/Tools/IdeCoreBenchmarks/IdeCoreBenchmarks.csproj +++ b/src/Tools/IdeCoreBenchmarks/IdeCoreBenchmarks.csproj @@ -16,6 +16,7 @@ + @@ -38,6 +39,7 @@ + diff --git a/src/VisualStudio/CSharp/Test/PersistentStorage/AbstractPersistentStorageTests.cs b/src/VisualStudio/CSharp/Test/PersistentStorage/AbstractPersistentStorageTests.cs index d51cb83ca8fd7..09ffc4b65775b 100644 --- a/src/VisualStudio/CSharp/Test/PersistentStorage/AbstractPersistentStorageTests.cs +++ b/src/VisualStudio/CSharp/Test/PersistentStorage/AbstractPersistentStorageTests.cs @@ -786,6 +786,26 @@ public void CacheDirectoryShouldNotBeAtRoot() Assert.False(location?.StartsWith("/") ?? false); } + [Theory] + [CombinatorialData] + public async Task PersistentService_ReadByteTwice(Size size, bool withChecksum) + { + var solution = CreateOrOpenSolution(); + var streamName1 = "PersistentService_ReadByteTwice"; + + await using (var storage = await GetStorageAsync(solution)) + { + Assert.True(await storage.WriteStreamAsync(streamName1, EncodeString(GetData1(size)), GetChecksum1(withChecksum))); + } + + await using (var storage = await GetStorageAsync(solution)) + { + using var stream = await storage.ReadStreamAsync(streamName1, GetChecksum1(withChecksum)); + stream.ReadByte(); + stream.ReadByte(); + } + } + [PartNotDiscoverable] [ExportWorkspaceService(typeof(IPersistentStorageLocationService), layer: ServiceLayer.Test), Shared] private class TestPersistentStorageLocationService : DefaultPersistentStorageLocationService diff --git a/src/VisualStudio/Core/Def/Microsoft.VisualStudio.LanguageServices.csproj b/src/VisualStudio/Core/Def/Microsoft.VisualStudio.LanguageServices.csproj index cfd6cf61be6b8..324c659f597a5 100644 --- a/src/VisualStudio/Core/Def/Microsoft.VisualStudio.LanguageServices.csproj +++ b/src/VisualStudio/Core/Def/Microsoft.VisualStudio.LanguageServices.csproj @@ -169,6 +169,7 @@ + diff --git a/src/VisualStudio/Core/Def/Storage/CloudCachePersistentStorage.cs b/src/VisualStudio/Core/Def/Storage/CloudCachePersistentStorage.cs index d538d67739f67..6f19267631ff3 100644 --- a/src/VisualStudio/Core/Def/Storage/CloudCachePersistentStorage.cs +++ b/src/VisualStudio/Core/Def/Storage/CloudCachePersistentStorage.cs @@ -13,6 +13,7 @@ using Microsoft.CodeAnalysis.PersistentStorage; using Microsoft.CodeAnalysis.PooledObjects; using Microsoft.VisualStudio.RpcContracts.Caching; +using Nerdbank.Streams; using Roslyn.Utilities; namespace Microsoft.VisualStudio.LanguageServices.Storage @@ -163,16 +164,27 @@ private async Task ChecksumMatchesAsync(string name, Checksum checksum, Ca // and then pass that out. This should not be a problem in practice as PipeReader internally intelligently // uses and pools reasonable sized buffers, preventing us from exacerbating the GC or causing LOH // allocations. + return await AsPrebufferedStreamAsync(pipe.Reader, cancellationToken).ConfigureAwait(false); + } + + private static async Task AsPrebufferedStreamAsync(PipeReader pipeReader, CancellationToken cancellationToken = default) + { while (true) { - var readResult = await pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); - pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); + // Read and immediately report all bytes as "examined" so that the next ReadAsync call will block till more bytes come in. + // The goal here is to force the PipeReader to buffer everything internally (even if it were to exceed its natural writer threshold limit). + ReadResult readResult = await pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); + pipeReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); if (readResult.IsCompleted) - break; + { + // After having buffered and "examined" all the bytes, the stream returned from PipeReader.AsStream() would fail + // because it may not "examine" all bytes at once. + // Instead, we'll create our own Stream over just the buffer itself, and recycle the buffers when the stream is disposed + // the way the stream returned from PipeReader.AsStream() would have. + return new ReadOnlySequenceStream(readResult.Buffer, reader => ((PipeReader)reader!).Complete(), pipeReader); + } } - - return pipe.Reader.AsStream(); } public sealed override Task WriteStreamAsync(string name, Stream stream, Checksum? checksum, CancellationToken cancellationToken) diff --git a/src/VisualStudio/Core/Def/Storage/Nerdbank/ReadOnlySequenceStream.cs b/src/VisualStudio/Core/Def/Storage/Nerdbank/ReadOnlySequenceStream.cs new file mode 100644 index 0000000000000..eee38d978c543 --- /dev/null +++ b/src/VisualStudio/Core/Def/Storage/Nerdbank/ReadOnlySequenceStream.cs @@ -0,0 +1,262 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// Copied from https://raw.githubusercontent.com/AArnott/Nerdbank.Streams/2b142fa6a38b15e4b06ecc53bf073aa49fd1de34/src/Nerdbank.Streams/ReadOnlySequenceStream.cs +// Remove once we move to Nerdbank.Streams 2.7.62-alpha + +namespace Nerdbank.Streams +{ + using System; + using System.Buffers; + using System.IO; + using System.Runtime.InteropServices; + using System.Threading; + using System.Threading.Tasks; + using Microsoft; + + internal class ReadOnlySequenceStream : Stream, IDisposableObservable + { + private static readonly Task TaskOfZero = Task.FromResult(0); + + private readonly Action? disposeAction; + private readonly object? disposeActionArg; + + /// + /// A reusable task if two consecutive reads return the same number of bytes. + /// + private Task? lastReadTask; + + private readonly ReadOnlySequence readOnlySequence; + + private SequencePosition position; + + internal ReadOnlySequenceStream(ReadOnlySequence readOnlySequence, Action? disposeAction, object? disposeActionArg) + { + this.readOnlySequence = readOnlySequence; + this.disposeAction = disposeAction; + this.disposeActionArg = disposeActionArg; + this.position = readOnlySequence.Start; + } + + /// + public override bool CanRead => !this.IsDisposed; + + /// + public override bool CanSeek => !this.IsDisposed; + + /// + public override bool CanWrite => false; + + /// + public override long Length => this.ReturnOrThrowDisposed(this.readOnlySequence.Length); + + /// + public override long Position + { + get => this.readOnlySequence.Slice(0, this.position).Length; + set + { + Requires.Range(value >= 0, nameof(value)); + this.position = this.readOnlySequence.GetPosition(value, this.readOnlySequence.Start); + } + } + + /// + public bool IsDisposed { get; private set; } + + /// + public override void Flush() => this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override Task FlushAsync(CancellationToken cancellationToken) => throw this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override int Read(byte[] buffer, int offset, int count) + { + ReadOnlySequence remaining = this.readOnlySequence.Slice(this.position); + ReadOnlySequence toCopy = remaining.Slice(0, Math.Min(count, remaining.Length)); + this.position = toCopy.End; + toCopy.CopyTo(buffer.AsSpan(offset, count)); + return (int)toCopy.Length; + } + + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + int bytesRead = this.Read(buffer, offset, count); + if (bytesRead == 0) + { + return TaskOfZero; + } + + if (this.lastReadTask?.Result == bytesRead) + { + return this.lastReadTask; + } + else + { + return this.lastReadTask = Task.FromResult(bytesRead); + } + } + + /// + public override int ReadByte() + { + ReadOnlySequence remaining = this.readOnlySequence.Slice(this.position); + if (remaining.Length > 0) + { + byte result = remaining.First.Span[0]; + this.position = this.readOnlySequence.GetPosition(1, this.position); + return result; + } + else + { + return -1; + } + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + Verify.NotDisposed(this); + + SequencePosition relativeTo; + switch (origin) + { + case SeekOrigin.Begin: + relativeTo = this.readOnlySequence.Start; + break; + case SeekOrigin.Current: + if (offset >= 0) + { + relativeTo = this.position; + } + else + { + relativeTo = this.readOnlySequence.Start; + offset += this.Position; + } + + break; + case SeekOrigin.End: + if (offset >= 0) + { + relativeTo = this.readOnlySequence.End; + } + else + { + relativeTo = this.readOnlySequence.Start; + offset += this.Position; + } + + break; + default: + throw new ArgumentOutOfRangeException(nameof(origin)); + } + + this.position = this.readOnlySequence.GetPosition(offset, relativeTo); + return this.Position; + } + + /// + public override void SetLength(long value) => this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override void Write(byte[] buffer, int offset, int count) => this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override void WriteByte(byte value) => this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => throw this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + foreach (var segment in this.readOnlySequence) + { + await WriteAsync(destination, segment, cancellationToken).ConfigureAwait(false); + } + } + + private static ValueTask WriteAsync(Stream stream, ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + Requires.NotNull(stream, nameof(stream)); + + if (MemoryMarshal.TryGetArray(buffer, out ArraySegment array)) + { + return new ValueTask(stream.WriteAsync(array.Array!, array.Offset, array.Count, cancellationToken)); + } + else + { + byte[] sharedBuffer = ArrayPool.Shared.Rent(buffer.Length); + buffer.Span.CopyTo(sharedBuffer); + return new ValueTask(FinishWriteAsync(stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer)); + } + + async Task FinishWriteAsync(Task writeTask, byte[] localBuffer) + { + try + { + await writeTask.ConfigureAwait(false); + } + finally + { + ArrayPool.Shared.Return(localBuffer); + } + } + } + +#if SPAN_BUILTIN + + /// + public override int Read(Span buffer) + { + ReadOnlySequence remaining = this.readOnlySequence.Slice(this.position); + ReadOnlySequence toCopy = remaining.Slice(0, Math.Min(buffer.Length, remaining.Length)); + this.position = toCopy.End; + toCopy.CopyTo(buffer); + return (int)toCopy.Length; + } + + /// + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + return new ValueTask(this.Read(buffer.Span)); + } + + /// + public override void Write(ReadOnlySpan buffer) => throw this.ThrowDisposedOr(new NotSupportedException()); + + /// + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => throw this.ThrowDisposedOr(new NotSupportedException()); + +#endif + + /// + protected override void Dispose(bool disposing) + { + if (!this.IsDisposed) + { + this.IsDisposed = true; + this.disposeAction?.Invoke(this.disposeActionArg); + base.Dispose(disposing); + } + } + + private T ReturnOrThrowDisposed(T value) + { + Verify.NotDisposed(this); + return value; + } + + private Exception ThrowDisposedOr(Exception ex) + { + Verify.NotDisposed(this); + throw ex; + } + } +} diff --git a/src/Workspaces/Remote/ServiceHub/Microsoft.CodeAnalysis.Remote.ServiceHub.csproj b/src/Workspaces/Remote/ServiceHub/Microsoft.CodeAnalysis.Remote.ServiceHub.csproj index 4e009fb8a1fea..5ad1f662f2edb 100644 --- a/src/Workspaces/Remote/ServiceHub/Microsoft.CodeAnalysis.Remote.ServiceHub.csproj +++ b/src/Workspaces/Remote/ServiceHub/Microsoft.CodeAnalysis.Remote.ServiceHub.csproj @@ -29,6 +29,7 @@ + @@ -39,6 +40,7 @@ +