Back to index...
/*
 * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package sun.rmi.transport.tcp;
import java.io.*;
/**
 * MultiplexInputStream manages receiving data over a connection managed
 * by a ConnectionMultiplexer object.  This object is responsible for
 * requesting more bytes of data as space in its internal buffer becomes
 * available.
 *
 * @author Peter Jones
 */
final class MultiplexInputStream extends InputStream {
    /** object managing multiplexed connection */
    private ConnectionMultiplexer manager;
    /** information about the connection this is the input stream for */
    private MultiplexConnectionInfo info;
    /** input buffer */
    private byte buffer[];
    /** number of real data bytes present in buffer */
    private int present = 0;
    /** current position to read from in input buffer */
    private int pos = 0;
    /** pending number of bytes this stream has requested */
    private int requested = 0;
    /** true if this connection has been disconnected */
    private boolean disconnected = false;
    /**
     * lock acquired to access shared variables:
     * buffer, present, pos, requested, & disconnected
     * WARNING:  Any of the methods manager.send*() should not be
     * invoked while this lock is held, since they could potentially
     * block if the underlying connection's transport buffers are
     * full, and the manager may need to acquire this lock to process
     * and consume data coming over the underlying connection.
     */
    private Object lock = new Object();
    /** level at which more data is requested when read past */
    private int waterMark;
    /** data structure for holding reads of one byte */
    private byte temp[] = new byte[1];
    /**
     * Create a new MultiplexInputStream for the given manager.
     * @param manager object that manages this connection
     * @param info structure for connection this stream reads from
     * @param bufferLength length of input buffer
     */
    MultiplexInputStream(
        ConnectionMultiplexer    manager,
        MultiplexConnectionInfo  info,
        int                      bufferLength)
    {
        this.manager = manager;
        this.info    = info;
        buffer = new byte[bufferLength];
        waterMark = bufferLength / 2;
    }
    /**
     * Read a byte from the connection.
     */
    public synchronized int read() throws IOException
    {
        int n = read(temp, 0, 1);
        if (n != 1)
            return -1;
        return temp[0] & 0xFF;
    }
    /**
     * Read a subarray of bytes from connection.  This method blocks for
     * at least one byte, and it returns the number of bytes actually read,
     * or -1 if the end of the stream was detected.
     * @param b array to read bytes into
     * @param off offset of beginning of bytes to read into
     * @param len number of bytes to read
     */
    public synchronized int read(byte b[], int off, int len) throws IOException
    {
        if (len <= 0)
            return 0;
        int moreSpace;
        synchronized (lock) {
            if (pos >= present)
                pos = present = 0;
            else if (pos >= waterMark) {
                System.arraycopy(buffer, pos, buffer, 0, present - pos);
                present -= pos;
                pos = 0;
            }
            int freeSpace = buffer.length - present;
            moreSpace = Math.max(freeSpace - requested, 0);
        }
        if (moreSpace > 0)
            manager.sendRequest(info, moreSpace);
        synchronized (lock) {
            requested += moreSpace;
            while ((pos >= present) && !disconnected) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                }
            }
            if (disconnected && pos >= present)
                return -1;
            int available = present - pos;
            if (len < available) {
                System.arraycopy(buffer, pos, b, off, len);
                pos += len;
                return len;
            }
            else {
                System.arraycopy(buffer, pos, b, off, available);
                pos = present = 0;
                // could send another request here, if len > available??
                return available;
            }
        }
    }
    /**
     * Return the number of bytes immediately available for reading.
     */
    public int available() throws IOException
    {
        synchronized (lock) {
            return present - pos;
        }
    }
    /**
     * Close this connection.
     */
    public void close() throws IOException
    {
        manager.sendClose(info);
    }
    /**
     * Receive bytes transmitted from connection at remote endpoint.
     * @param length number of bytes transmitted
     * @param in input stream with those bytes ready to be read
     */
    void receive(int length, DataInputStream in)
        throws IOException
    {
        /* TO DO: Optimize so that data received from stream can be loaded
         * directly into user's buffer if there is a pending read().
         */
        synchronized (lock) {
            if ((pos > 0) && ((buffer.length - present) < length)) {
                System.arraycopy(buffer, pos, buffer, 0, present - pos);
                present -= pos;
                pos = 0;
            }
            if ((buffer.length - present) < length)
                throw new IOException("Receive buffer overflow");
            in.readFully(buffer, present, length);
            present += length;
            requested -= length;
            lock.notifyAll();
        }
    }
    /**
     * Disconnect this stream from all connection activity.
     */
    void disconnect()
    {
        synchronized (lock) {
            disconnected = true;
            lock.notifyAll();
        }
    }
}
Back to index...