1 /*
   2  * Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.net.*;
  25 import java.io.*;
  26 import java.nio.*;
  27 import java.nio.channels.*;
  28 import sun.net.www.MessageHeader;
  29 import java.util.*;
  30 
  31 /**
  32  * This class implements a simple HTTP server. It uses multiple threads to
  33  * handle connections in parallel, and also multiple connections/requests
  34  * can be handled per thread.
  35  * <p>
  36  * It must be instantiated with a {@link HttpCallback} object to which
  37  * requests are given and must be handled.
  38  * <p>
  39  * Simple synchronization between the client(s) and server can be done
  40  * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and
  41  * {@link #rendezvous(String,int)} methods.
  42  *
  43  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  44  *
  45  * If changes are made here, please sure they are propagated to
  46  * the HTTPS equivalent in the JSSE regression test suite.
  47  *
  48  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  49  */
  50 
  51 public class TestHttpServer {
  52 
  53     ServerSocketChannel schan;
  54     int threads;
  55     int cperthread;
  56     HttpCallback cb;
  57     Server[] servers;
  58 
  59     /**
  60      * Create a <code>TestHttpServer<code> instance with the specified callback object
  61      * for handling requests. One thread is created to handle requests,
  62      * and up to ten TCP connections will be handled simultaneously.
  63      * @param cb the callback object which is invoked to handle each
  64      *  incoming request
  65      */
  66 
  67     public TestHttpServer (HttpCallback cb) throws IOException {
  68         this (cb, 1, 10, 0);
  69     }
  70 
  71     /**
  72      * Create a <code>TestHttpServer<code> instance with the specified callback object
  73      * for handling requests. One thread is created to handle requests,
  74      * and up to ten TCP connections will be handled simultaneously.
  75      * @param cb the callback object which is invoked to handle each
  76      *  incoming request
  77      * @param address the address to bind the server to. <code>Null</code>
  78      *  means bind to the wildcard address.
  79      * @param port the port number to bind the server to. <code>Zero</code>
  80      *  means choose any free port.
  81      */
  82 
  83     public TestHttpServer (HttpCallback cb, InetAddress address, int port) throws IOException {
  84         this (cb, 1, 10, address, 0);
  85     }
  86 
  87     /**
  88      * Create a <code>TestHttpServer<code> instance with the specified number of
  89      * threads and maximum number of connections per thread. This functions
  90      * the same as the 4 arg constructor, where the port argument is set to zero.
  91      * @param cb the callback object which is invoked to handle each
  92      *     incoming request
  93      * @param threads the number of threads to create to handle requests
  94      *     in parallel
  95      * @param cperthread the number of simultaneous TCP connections to
  96      *     handle per thread
  97      */
  98 
  99     public TestHttpServer (HttpCallback cb, int threads, int cperthread)
 100         throws IOException {
 101         this (cb, threads, cperthread, 0);
 102     }
 103 
 104     /**
 105      * Create a <code>TestHttpServer<code> instance with the specified number
 106      * of threads and maximum number of connections per thread and running on
 107      * the specified port. The specified number of threads are created to
 108      * handle incoming requests, and each thread is allowed
 109      * to handle a number of simultaneous TCP connections.
 110      * @param cb the callback object which is invoked to handle
 111      *  each incoming request
 112      * @param threads the number of threads to create to handle
 113      *  requests in parallel
 114      * @param cperthread the number of simultaneous TCP connections
 115      *  to handle per thread
 116      * @param port the port number to bind the server to. <code>Zero</code>
 117      *  means choose any free port.
 118      */
 119 
 120     public TestHttpServer (HttpCallback cb, int threads, int cperthread, int port)
 121             throws IOException {
 122         this(cb, threads, cperthread, null, port);
 123     }
 124 
 125     /**
 126      * Create a <code>TestHttpServer<code> instance with the specified number
 127      * of threads and maximum number of connections per thread and running on
 128      * the specified port. The specified number of threads are created to
 129      * handle incoming requests, and each thread is allowed
 130      * to handle a number of simultaneous TCP connections.
 131      * @param cb the callback object which is invoked to handle
 132      *  each incoming request
 133      * @param threads the number of threads to create to handle
 134      *  requests in parallel
 135      * @param cperthread the number of simultaneous TCP connections
 136      *  to handle per thread
 137      * @param address the address to bind the server to. <code>Null</code>
 138      *  means bind to the wildcard address.
 139      * @param port the port number to bind the server to. <code>Zero</code>
 140      *  means choose any free port.
 141      */
 142 
 143     public TestHttpServer (HttpCallback cb, int threads, int cperthread,
 144                            InetAddress address, int port)
 145         throws IOException {
 146         schan = ServerSocketChannel.open ();
 147         InetSocketAddress addr = new InetSocketAddress (address, port);
 148         schan.socket().bind (addr);
 149         this.threads = threads;
 150         this.cb = cb;
 151         this.cperthread = cperthread;
 152         servers = new Server [threads];
 153         for (int i=0; i<threads; i++) {
 154             servers[i] = new Server (cb, schan, cperthread);
 155             servers[i].start();
 156         }
 157     }
 158 
 159     /**
 160      * Tell all threads in the server to exit within 5 seconds.
 161      * This is an abortive termination. Just prior to the thread exiting
 162      * all channels in that thread waiting to be closed are forceably closed.
 163      * @throws InterruptedException
 164      */
 165 
 166     public void terminate () {
 167         for (int i=0; i<threads; i++) {
 168             servers[i].terminate ();
 169         }
 170 
 171         for (int i = 0; i < threads; i++) {
 172             try {
 173                 servers[i].join();
 174             } catch (InterruptedException e) {
 175                 System.err.println("Unexpected InterruptedException during terminating server");
 176                 throw new RuntimeException(e);
 177             }
 178         }
 179     }
 180 
 181     /**
 182      * return the local port number to which the server is bound.
 183      * @return the local port number
 184      */
 185 
 186     public int getLocalPort () {
 187         return schan.socket().getLocalPort ();
 188     }
 189 
 190     public String getAuthority() {
 191         InetAddress address = schan.socket().getInetAddress();
 192         String hostaddr = address.getHostAddress();
 193         if (address.isAnyLocalAddress()) hostaddr = "localhost";
 194         if (hostaddr.indexOf(':') > -1) hostaddr = "[" + hostaddr + "]";
 195         return hostaddr + ":" + getLocalPort();
 196     }
 197 
 198     static class Server extends Thread {
 199 
 200         ServerSocketChannel schan;
 201         Selector selector;
 202         SelectionKey listenerKey;
 203         SelectionKey key; /* the current key being processed */
 204         HttpCallback cb;
 205         ByteBuffer consumeBuffer;
 206         int maxconn;
 207         int nconn;
 208         ClosedChannelList clist;
 209         volatile boolean shutdown;
 210 
 211         Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {
 212             this.schan = schan;
 213             this.maxconn = maxconn;
 214             this.cb = cb;
 215             nconn = 0;
 216             consumeBuffer = ByteBuffer.allocate (512);
 217             clist = new ClosedChannelList ();
 218             try {
 219                 selector = Selector.open ();
 220                 schan.configureBlocking (false);
 221                 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 222             } catch (IOException e) {
 223                 System.err.println ("Server could not start: " + e);
 224                 throw new RuntimeException("Server could not start: " + e, e);
 225             }
 226         }
 227 
 228         /* Stop the thread as soon as possible */
 229         public void terminate () {
 230             shutdown = true;
 231         }
 232 
 233         public void run ()  {
 234             try {
 235                 while (true) {
 236                     selector.select(1000);
 237                     Set<SelectionKey> selected = selector.selectedKeys();
 238                     Iterator<SelectionKey> iter = selected.iterator();
 239                     while (iter.hasNext()) {
 240                         key = iter.next();
 241                         if (key.equals (listenerKey)) {
 242                             SocketChannel sock = schan.accept ();
 243                             if (sock == null) {
 244                                 /* false notification */
 245                                 iter.remove();
 246                                 continue;
 247                             }
 248                             sock.configureBlocking (false);
 249                             sock.register (selector, SelectionKey.OP_READ);
 250                             nconn ++;
 251                             System.out.println("SERVER: new connection. chan[" + sock + "]");
 252                             if (nconn == maxconn) {
 253                                 /* deregister */
 254                                 listenerKey.cancel ();
 255                                 listenerKey = null;
 256                             }
 257                         } else {
 258                             if (key.isReadable()) {
 259                                 boolean closed;
 260                                 SocketChannel chan = (SocketChannel) key.channel();
 261                                 System.out.println("SERVER: connection readable. chan[" + chan + "]");
 262                                 if (key.attachment() != null) {
 263                                     System.out.println("Server: consume");
 264                                     closed = consume (chan);
 265                                 } else {
 266                                     closed = read (chan, key);
 267                                 }
 268                                 if (closed) {
 269                                     chan.close ();
 270                                     key.cancel ();
 271                                     if (nconn == maxconn) {
 272                                         listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 273                                     }
 274                                     nconn --;
 275                                 }
 276                             }
 277                         }
 278                         iter.remove();
 279                     }
 280                     clist.check();
 281                     if (shutdown) {
 282                         System.out.println("Force to Shutdown");
 283                         SelectionKey sKey = schan.keyFor(selector);
 284                         if (sKey != null) {
 285                             sKey.cancel();
 286                         }
 287 
 288                         clist.terminate ();
 289                         selector.close();
 290                         schan.socket().close();
 291                         schan.close();
 292                         return;
 293                     }
 294                 }
 295             } catch (IOException e) {
 296                 System.out.println ("Server exception: " + e);
 297                 // TODO finish
 298             }
 299         }
 300 
 301         /* read all the data off the channel without looking at it
 302              * return true if connection closed
 303              */
 304         boolean consume (SocketChannel chan) {
 305             try {
 306                 consumeBuffer.clear ();
 307                 int c = chan.read (consumeBuffer);
 308                 if (c == -1)
 309                     return true;
 310             } catch (IOException e) {
 311                 return true;
 312             }
 313             return false;
 314         }
 315 
 316         /* return true if the connection is closed, false otherwise */
 317 
 318         private boolean read (SocketChannel chan, SelectionKey key) {
 319             HttpTransaction msg;
 320             boolean res;
 321             try {
 322                 InputStream is = new BufferedInputStream (new NioInputStream (chan));
 323                 String requestline = readLine (is);
 324                 MessageHeader mhead = new MessageHeader (is);
 325                 String clen = mhead.findValue ("Content-Length");
 326                 String trferenc = mhead.findValue ("Transfer-Encoding");
 327                 String data = null;
 328                 if (trferenc != null && trferenc.equals ("chunked"))
 329                     data = new String (readChunkedData (is));
 330                 else if (clen != null)
 331                     data = new String (readNormalData (is, Integer.parseInt (clen)));
 332                 String[] req = requestline.split (" ");
 333                 if (req.length < 2) {
 334                     /* invalid request line */
 335                     return false;
 336                 }
 337                 String cmd = req[0];
 338                 URI uri = null;
 339                 try {
 340                     uri = new URI (req[1]);
 341                     msg = new HttpTransaction (this, cmd, uri, mhead, data, null, key);
 342                     cb.request (msg);
 343                 } catch (URISyntaxException e) {
 344                     System.err.println ("Invalid URI: " + e);
 345                     msg = new HttpTransaction (this, cmd, null, null, null, null, key);
 346                     msg.sendResponse (501, "Whatever");
 347                 }
 348                 res = false;
 349             } catch (IOException e) {
 350                 res = true;
 351             }
 352             return res;
 353         }
 354 
 355         byte[] readNormalData (InputStream is, int len) throws IOException {
 356             byte [] buf  = new byte [len];
 357             int c, off=0, remain=len;
 358             while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
 359                 remain -= c;
 360                 off += c;
 361             }
 362             return buf;
 363         }
 364 
 365         private void readCRLF(InputStream is) throws IOException {
 366             int cr = is.read();
 367             int lf = is.read();
 368 
 369             if (((cr & 0xff) != 0x0d) ||
 370                 ((lf & 0xff) != 0x0a)) {
 371                 throw new IOException(
 372                     "Expected <CR><LF>:  got '" + cr + "/" + lf + "'");
 373             }
 374         }
 375 
 376         byte[] readChunkedData (InputStream is) throws IOException {
 377             LinkedList l = new LinkedList ();
 378             int total = 0;
 379             for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
 380                 l.add (readNormalData(is, len));
 381                 total += len;
 382                 readCRLF(is);  // CRLF at end of chunk
 383             }
 384             readCRLF(is); // CRLF at end of Chunked Stream.
 385             byte[] buf = new byte [total];
 386             Iterator i = l.iterator();
 387             int x = 0;
 388             while (i.hasNext()) {
 389                 byte[] b = (byte[])i.next();
 390                 System.arraycopy (b, 0, buf, x, b.length);
 391                 x += b.length;
 392             }
 393             return buf;
 394         }
 395 
 396         private int readChunkLen (InputStream is) throws IOException {
 397             int c, len=0;
 398             boolean done=false, readCR=false;
 399             while (!done) {
 400                 c = is.read ();
 401                 if (c == '\n' && readCR) {
 402                     done = true;
 403                 } else {
 404                     if (c == '\r' && !readCR) {
 405                         readCR = true;
 406                     } else {
 407                         int x=0;
 408                         if (c >= 'a' && c <= 'f') {
 409                             x = c - 'a' + 10;
 410                         } else if (c >= 'A' && c <= 'F') {
 411                             x = c - 'A' + 10;
 412                         } else if (c >= '0' && c <= '9') {
 413                             x = c - '0';
 414                         }
 415                         len = len * 16 + x;
 416                     }
 417                 }
 418             }
 419             return len;
 420         }
 421 
 422         private String readLine (InputStream is) throws IOException {
 423             boolean done=false, readCR=false;
 424             byte[] b = new byte [512];
 425             int c, l = 0;
 426 
 427             while (!done) {
 428                 c = is.read ();
 429                 if (c == '\n' && readCR) {
 430                     done = true;
 431                 } else {
 432                     if (c == '\r' && !readCR) {
 433                         readCR = true;
 434                     } else {
 435                         b[l++] = (byte)c;
 436                     }
 437                 }
 438             }
 439             return new String (b);
 440         }
 441 
 442         /** close the channel associated with the current key by:
 443          * 1. shutdownOutput (send a FIN)
 444          * 2. mark the key so that incoming data is to be consumed and discarded
 445          * 3. After a period, close the socket
 446          */
 447 
 448         synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {
 449             SocketChannel ch = (SocketChannel)key.channel ();
 450             System.out.println("SERVER: orderlyCloseChannel chan[" + ch + "]");
 451             ch.socket().shutdownOutput();
 452             key.attach (this);
 453             clist.add (key);
 454         }
 455 
 456         synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {
 457             SocketChannel ch = (SocketChannel)key.channel ();
 458             System.out.println("SERVER: abortiveCloseChannel chan[" + ch + "]");
 459 
 460             Socket s = ch.socket ();
 461             s.setSoLinger (true, 0);
 462             ch.close();
 463         }
 464     }
 465 
 466 
 467     /**
 468      * Implements blocking reading semantics on top of a non-blocking channel
 469      */
 470 
 471     static class NioInputStream extends InputStream {
 472         SocketChannel channel;
 473         Selector selector;
 474         ByteBuffer chanbuf;
 475         SelectionKey key;
 476         int available;
 477         byte[] one;
 478         boolean closed;
 479         ByteBuffer markBuf; /* reads may be satisifed from this buffer */
 480         boolean marked;
 481         boolean reset;
 482         int readlimit;
 483 
 484         public NioInputStream (SocketChannel chan) throws IOException {
 485             this.channel = chan;
 486             selector = Selector.open();
 487             chanbuf = ByteBuffer.allocate (1024);
 488             key = chan.register (selector, SelectionKey.OP_READ);
 489             available = 0;
 490             one = new byte[1];
 491             closed = marked = reset = false;
 492         }
 493 
 494         public synchronized int read (byte[] b) throws IOException {
 495             return read (b, 0, b.length);
 496         }
 497 
 498         public synchronized int read () throws IOException {
 499             return read (one, 0, 1);
 500         }
 501 
 502         public synchronized int read (byte[] b, int off, int srclen) throws IOException {
 503 
 504             int canreturn, willreturn;
 505 
 506             if (closed)
 507                 return -1;
 508 
 509             if (reset) { /* satisfy from markBuf */
 510                 canreturn = markBuf.remaining ();
 511                 willreturn = canreturn>srclen ? srclen : canreturn;
 512                 markBuf.get(b, off, willreturn);
 513                 if (canreturn == willreturn) {
 514                     reset = false;
 515                 }
 516             } else { /* satisfy from channel */
 517                 canreturn = available();
 518                 if (canreturn == 0) {
 519                     block ();
 520                     canreturn = available();
 521                 }
 522                 willreturn = canreturn>srclen ? srclen : canreturn;
 523                 chanbuf.get(b, off, willreturn);
 524                 available -= willreturn;
 525 
 526                 if (marked) { /* copy into markBuf */
 527                     try {
 528                         markBuf.put (b, off, willreturn);
 529                     } catch (BufferOverflowException e) {
 530                         marked = false;
 531                     }
 532                 }
 533             }
 534             return willreturn;
 535         }
 536 
 537         public synchronized int available () throws IOException {
 538             if (closed)
 539                 throw new IOException ("Stream is closed");
 540 
 541             if (reset)
 542                 return markBuf.remaining();
 543 
 544             if (available > 0)
 545                 return available;
 546 
 547             chanbuf.clear ();
 548             available = channel.read (chanbuf);
 549             if (available > 0)
 550                 chanbuf.flip();
 551             else if (available == -1)
 552                 throw new IOException ("Stream is closed");
 553             return available;
 554         }
 555 
 556         /**
 557          * block() only called when available==0 and buf is empty
 558          */
 559         private synchronized void block () throws IOException {
 560             //assert available == 0;
 561             int n = selector.select ();
 562             //assert n == 1;
 563             selector.selectedKeys().clear();
 564             available ();
 565         }
 566 
 567         public void close () throws IOException {
 568             if (closed)
 569                 return;
 570             channel.close ();
 571             closed = true;
 572         }
 573 
 574         public synchronized void mark (int readlimit) {
 575             if (closed)
 576                 return;
 577             this.readlimit = readlimit;
 578             markBuf = ByteBuffer.allocate (readlimit);
 579             marked = true;
 580             reset = false;
 581         }
 582 
 583         public synchronized void reset () throws IOException {
 584             if (closed )
 585                 return;
 586             if (!marked)
 587                 throw new IOException ("Stream not marked");
 588             marked = false;
 589             reset = true;
 590             markBuf.flip ();
 591         }
 592     }
 593 
 594     static class NioOutputStream extends OutputStream {
 595         SocketChannel channel;
 596         ByteBuffer buf;
 597         SelectionKey key;
 598         Selector selector;
 599         boolean closed;
 600         byte[] one;
 601 
 602         public NioOutputStream (SocketChannel channel) throws IOException {
 603             this.channel = channel;
 604             selector = Selector.open ();
 605             key = channel.register (selector, SelectionKey.OP_WRITE);
 606             closed = false;
 607             one = new byte [1];
 608         }
 609 
 610         public synchronized void write (int b) throws IOException {
 611             one[0] = (byte)b;
 612             write (one, 0, 1);
 613         }
 614 
 615         public synchronized void write (byte[] b) throws IOException {
 616             write (b, 0, b.length);
 617         }
 618 
 619         public synchronized void write (byte[] b, int off, int len) throws IOException {
 620             if (closed)
 621                 throw new IOException ("stream is closed");
 622 
 623             buf = ByteBuffer.allocate (len);
 624             buf.put (b, off, len);
 625             buf.flip ();
 626             int n;
 627             while ((n = channel.write (buf)) < len) {
 628                 len -= n;
 629                 if (len == 0)
 630                     return;
 631                 selector.select ();
 632                 selector.selectedKeys().clear ();
 633             }
 634         }
 635 
 636         public void close () throws IOException {
 637             if (closed)
 638                 return;
 639             channel.close ();
 640             closed = true;
 641         }
 642     }
 643 
 644     /**
 645      * Utilities for synchronization. A condition is
 646      * identified by a string name, and is initialized
 647      * upon first use (ie. setCondition() or waitForCondition()). Threads
 648      * are blocked until some thread calls (or has called) setCondition() for the same
 649      * condition.
 650      * <P>
 651      * A rendezvous built on a condition is also provided for synchronizing
 652      * N threads.
 653      */
 654 
 655     private static HashMap conditions = new HashMap();
 656 
 657     /*
 658      * Modifiable boolean object
 659      */
 660     private static class BValue {
 661         boolean v;
 662     }
 663 
 664     /*
 665      * Modifiable int object
 666      */
 667     private static class IValue {
 668         int v;
 669         IValue (int i) {
 670             v =i;
 671         }
 672     }
 673 
 674 
 675     private static BValue getCond (String condition) {
 676         synchronized (conditions) {
 677             BValue cond = (BValue) conditions.get (condition);
 678             if (cond == null) {
 679                 cond = new BValue();
 680                 conditions.put (condition, cond);
 681             }
 682             return cond;
 683         }
 684     }
 685 
 686     /**
 687      * Set the condition to true. Any threads that are currently blocked
 688      * waiting on the condition, will be unblocked and allowed to continue.
 689      * Threads that subsequently call waitForCondition() will not block.
 690      * If the named condition did not exist prior to the call, then it is created
 691      * first.
 692      */
 693 
 694     public static void setCondition (String condition) {
 695         BValue cond = getCond (condition);
 696         synchronized (cond) {
 697             if (cond.v) {
 698                 return;
 699             }
 700             cond.v = true;
 701             cond.notifyAll();
 702         }
 703     }
 704 
 705     /**
 706      * If the named condition does not exist, then it is created and initialized
 707      * to false. If the condition exists or has just been created and its value
 708      * is false, then the thread blocks until another thread sets the condition.
 709      * If the condition exists and is already set to true, then this call returns
 710      * immediately without blocking.
 711      */
 712 
 713     public static void waitForCondition (String condition) {
 714         BValue cond = getCond (condition);
 715         synchronized (cond) {
 716             if (!cond.v) {
 717                 try {
 718                     cond.wait();
 719                 } catch (InterruptedException e) {}
 720             }
 721         }
 722     }
 723 
 724     /* conditions must be locked when accessing this */
 725     static HashMap rv = new HashMap();
 726 
 727     /**
 728      * Force N threads to rendezvous (ie. wait for each other) before proceeding.
 729      * The first thread(s) to call are blocked until the last
 730      * thread makes the call. Then all threads continue.
 731      * <p>
 732      * All threads that call with the same condition name, must use the same value
 733      * for N (or the results may be not be as expected).
 734      * <P>
 735      * Obviously, if fewer than N threads make the rendezvous then the result
 736      * will be a hang.
 737      */
 738 
 739     public static void rendezvous (String condition, int N) {
 740         BValue cond;
 741         IValue iv;
 742         String name = "RV_"+condition;
 743 
 744         /* get the condition */
 745 
 746         synchronized (conditions) {
 747             cond = (BValue)conditions.get (name);
 748             if (cond == null) {
 749                 /* we are first caller */
 750                 if (N < 2) {
 751                     throw new RuntimeException ("rendezvous must be called with N >= 2");
 752                 }
 753                 cond = new BValue ();
 754                 conditions.put (name, cond);
 755                 iv = new IValue (N-1);
 756                 rv.put (name, iv);
 757             } else {
 758                 /* already initialised, just decrement the counter */
 759                 iv = (IValue) rv.get (name);
 760                 iv.v --;
 761             }
 762         }
 763 
 764         if (iv.v > 0) {
 765             waitForCondition (name);
 766         } else {
 767             setCondition (name);
 768             synchronized (conditions) {
 769                 clearCondition (name);
 770                 rv.remove (name);
 771             }
 772         }
 773     }
 774 
 775     /**
 776      * If the named condition exists and is set then remove it, so it can
 777      * be re-initialized and used again. If the condition does not exist, or
 778      * exists but is not set, then the call returns without doing anything.
 779      * Note, some higher level synchronization
 780      * may be needed between clear and the other operations.
 781      */
 782 
 783     public static void clearCondition(String condition) {
 784         BValue cond;
 785         synchronized (conditions) {
 786             cond = (BValue) conditions.get (condition);
 787             if (cond == null) {
 788                 return;
 789             }
 790             synchronized (cond) {
 791                 if (cond.v) {
 792                     conditions.remove (condition);
 793                 }
 794             }
 795         }
 796     }
 797 }